debug message tweaks
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* flag set by signal handler to precipitate a context switch
122  * LOCK: none (just an advisory flag)
123  */
124 int context_switch = 0;
125
126 /* flag that tracks whether we have done any execution in this time slice.
127  * LOCK: currently none, perhaps we should lock (but needs to be
128  * updated in the fast path of the scheduler).
129  */
130 nat recent_activity = ACTIVITY_YES;
131
132 /* if this flag is set as well, give up execution
133  * LOCK: none (changes once, from false->true)
134  */
135 rtsBool sched_state = SCHED_RUNNING;
136
137 #if defined(GRAN)
138 StgTSO *CurrentTSO;
139 #endif
140
141 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
142  *  exists - earlier gccs apparently didn't.
143  *  -= chak
144  */
145 StgTSO dummy_tso;
146
147 /*
148  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149  * in an MT setting, needed to signal that a worker thread shouldn't hang around
150  * in the scheduler when it is out of work.
151  */
152 rtsBool shutting_down_scheduler = rtsFalse;
153
154 /*
155  * This mutex protects most of the global scheduler data in
156  * the THREADED_RTS runtime.
157  */
158 #if defined(THREADED_RTS)
159 Mutex sched_mutex;
160 #endif
161
162 #if defined(PARALLEL_HASKELL)
163 StgTSO *LastTSO;
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
166 #endif
167
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
170 #endif
171
172 /* -----------------------------------------------------------------------------
173  * static function prototypes
174  * -------------------------------------------------------------------------- */
175
176 static Capability *schedule (Capability *initialCapability, Task *task);
177
178 //
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
182 //
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
186 #endif
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
192 #if defined(GRAN)
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
194 #endif
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
199 #endif
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
202 #endif
203 static void schedulePostRunThread(StgTSO *t);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
206                                          StgTSO *t);
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
208                                     nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
211                                              StgTSO *t );
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214                                 rtsBool force_major);
215
216 static rtsBool checkBlackHoles(Capability *cap);
217
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
223
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
226 #endif
227
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);  
231 #endif
232
233 #ifdef DEBUG
234 static char *whatNext_strs[] = {
235   "(unknown)",
236   "ThreadRunGHC",
237   "ThreadInterpret",
238   "ThreadKilled",
239   "ThreadRelocated",
240   "ThreadComplete"
241 };
242 #endif
243
244 /* -----------------------------------------------------------------------------
245  * Putting a thread on the run queue: different scheduling policies
246  * -------------------------------------------------------------------------- */
247
248 STATIC_INLINE void
249 addToRunQueue( Capability *cap, StgTSO *t )
250 {
251 #if defined(PARALLEL_HASKELL)
252     if (RtsFlags.ParFlags.doFairScheduling) { 
253         // this does round-robin scheduling; good for concurrency
254         appendToRunQueue(cap,t);
255     } else {
256         // this does unfair scheduling; good for parallelism
257         pushOnRunQueue(cap,t);
258     }
259 #else
260     // this does round-robin scheduling; good for concurrency
261     appendToRunQueue(cap,t);
262 #endif
263 }
264
265 /* ---------------------------------------------------------------------------
266    Main scheduling loop.
267
268    We use round-robin scheduling, each thread returning to the
269    scheduler loop when one of these conditions is detected:
270
271       * out of heap space
272       * timer expires (thread yields)
273       * thread blocks
274       * thread ends
275       * stack overflow
276
277    GRAN version:
278      In a GranSim setup this loop iterates over the global event queue.
279      This revolves around the global event queue, which determines what 
280      to do next. Therefore, it's more complicated than either the 
281      concurrent or the parallel (GUM) setup.
282
283    GUM version:
284      GUM iterates over incoming messages.
285      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286      and sends out a fish whenever it has nothing to do; in-between
287      doing the actual reductions (shared code below) it processes the
288      incoming messages and deals with delayed operations 
289      (see PendingFetches).
290      This is not the ugliest code you could imagine, but it's bloody close.
291
292    ------------------------------------------------------------------------ */
293
294 static Capability *
295 schedule (Capability *initialCapability, Task *task)
296 {
297   StgTSO *t;
298   Capability *cap;
299   StgThreadReturnCode ret;
300 #if defined(GRAN)
301   rtsEvent *event;
302 #elif defined(PARALLEL_HASKELL)
303   StgTSO *tso;
304   GlobalTaskId pe;
305   rtsBool receivedFinish = rtsFalse;
306 # if defined(DEBUG)
307   nat tp_size, sp_size; // stats only
308 # endif
309 #endif
310   nat prev_what_next;
311   rtsBool ready_to_gc;
312 #if defined(THREADED_RTS)
313   rtsBool first = rtsTrue;
314 #endif
315   
316   cap = initialCapability;
317
318   // Pre-condition: this task owns initialCapability.
319   // The sched_mutex is *NOT* held
320   // NB. on return, we still hold a capability.
321
322   debugTrace (DEBUG_sched, 
323               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324               task, initialCapability);
325
326   schedulePreLoop();
327
328   // -----------------------------------------------------------
329   // Scheduler loop starts here:
330
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION        (!receivedFinish)
333 #elif defined(GRAN)
334 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
335 #else
336 #define TERMINATION_CONDITION        rtsTrue
337 #endif
338
339   while (TERMINATION_CONDITION) {
340
341 #if defined(GRAN)
342       /* Choose the processor with the next event */
343       CurrentProc = event->proc;
344       CurrentTSO = event->tso;
345 #endif
346
347 #if defined(THREADED_RTS)
348       if (first) {
349           // don't yield the first time, we want a chance to run this
350           // thread for a bit, even if there are others banging at the
351           // door.
352           first = rtsFalse;
353           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354       } else {
355           // Yield the capability to higher-priority tasks if necessary.
356           yieldCapability(&cap, task);
357       }
358 #endif
359       
360 #if defined(THREADED_RTS)
361       schedulePushWork(cap,task);
362 #endif
363
364     // Check whether we have re-entered the RTS from Haskell without
365     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
366     // call).
367     if (cap->in_haskell) {
368           errorBelch("schedule: re-entered unsafely.\n"
369                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
370           stg_exit(EXIT_FAILURE);
371     }
372
373     // The interruption / shutdown sequence.
374     // 
375     // In order to cleanly shut down the runtime, we want to:
376     //   * make sure that all main threads return to their callers
377     //     with the state 'Interrupted'.
378     //   * clean up all OS threads assocated with the runtime
379     //   * free all memory etc.
380     //
381     // So the sequence for ^C goes like this:
382     //
383     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
384     //     arranges for some Capability to wake up
385     //
386     //   * all threads in the system are halted, and the zombies are
387     //     placed on the run queue for cleaning up.  We acquire all
388     //     the capabilities in order to delete the threads, this is
389     //     done by scheduleDoGC() for convenience (because GC already
390     //     needs to acquire all the capabilities).  We can't kill
391     //     threads involved in foreign calls.
392     // 
393     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
394     //
395     //   * sched_state := SCHED_SHUTTING_DOWN
396     //
397     //   * all workers exit when the run queue on their capability
398     //     drains.  All main threads will also exit when their TSO
399     //     reaches the head of the run queue and they can return.
400     //
401     //   * eventually all Capabilities will shut down, and the RTS can
402     //     exit.
403     //
404     //   * We might be left with threads blocked in foreign calls, 
405     //     we should really attempt to kill these somehow (TODO);
406     
407     switch (sched_state) {
408     case SCHED_RUNNING:
409         break;
410     case SCHED_INTERRUPTING:
411         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413         discardSparksCap(cap);
414 #endif
415         /* scheduleDoGC() deletes all the threads */
416         cap = scheduleDoGC(cap,task,rtsFalse);
417         break;
418     case SCHED_SHUTTING_DOWN:
419         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420         // If we are a worker, just exit.  If we're a bound thread
421         // then we will exit below when we've removed our TSO from
422         // the run queue.
423         if (task->tso == NULL && emptyRunQueue(cap)) {
424             return cap;
425         }
426         break;
427     default:
428         barf("sched_state: %d", sched_state);
429     }
430
431 #if defined(THREADED_RTS)
432     // If the run queue is empty, take a spark and turn it into a thread.
433     {
434         if (emptyRunQueue(cap)) {
435             StgClosure *spark;
436             spark = findSpark(cap);
437             if (spark != NULL) {
438                 debugTrace(DEBUG_sched,
439                            "turning spark of closure %p into a thread",
440                            (StgClosure *)spark);
441                 createSparkThread(cap,spark);     
442             }
443         }
444     }
445 #endif // THREADED_RTS
446
447     scheduleStartSignalHandlers(cap);
448
449     // Only check the black holes here if we've nothing else to do.
450     // During normal execution, the black hole list only gets checked
451     // at GC time, to avoid repeatedly traversing this possibly long
452     // list each time around the scheduler.
453     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454
455     scheduleCheckWakeupThreads(cap);
456
457     scheduleCheckBlockedThreads(cap);
458
459     scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461     cap = task->cap;    // reload cap, it might have changed
462 #endif
463
464     // Normally, the only way we can get here with no threads to
465     // run is if a keyboard interrupt received during 
466     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467     // Additionally, it is not fatal for the
468     // threaded RTS to reach here with no threads to run.
469     //
470     // win32: might be here due to awaitEvent() being abandoned
471     // as a result of a console event having been delivered.
472     if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474         ASSERT(sched_state >= SCHED_INTERRUPTING);
475 #endif
476         continue; // nothing to do
477     }
478
479 #if defined(PARALLEL_HASKELL)
480     scheduleSendPendingMessages();
481     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
482         continue;
483
484 #if defined(SPARKS)
485     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
486 #endif
487
488     /* If we still have no work we need to send a FISH to get a spark
489        from another PE */
490     if (emptyRunQueue(cap)) {
491         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492         ASSERT(rtsFalse); // should not happen at the moment
493     }
494     // from here: non-empty run queue.
495     //  TODO: merge above case with this, only one call processMessages() !
496     if (PacketsWaiting()) {  /* process incoming messages, if
497                                 any pending...  only in else
498                                 because getRemoteWork waits for
499                                 messages as well */
500         receivedFinish = processMessages();
501     }
502 #endif
503
504 #if defined(GRAN)
505     scheduleProcessEvent(event);
506 #endif
507
508     // 
509     // Get a thread to run
510     //
511     t = popRunQueue(cap);
512
513 #if defined(GRAN) || defined(PAR)
514     scheduleGranParReport(); // some kind of debuging output
515 #else
516     // Sanity check the thread we're about to run.  This can be
517     // expensive if there is lots of thread switching going on...
518     IF_DEBUG(sanity,checkTSO(t));
519 #endif
520
521 #if defined(THREADED_RTS)
522     // Check whether we can run this thread in the current task.
523     // If not, we have to pass our capability to the right task.
524     {
525         Task *bound = t->bound;
526       
527         if (bound) {
528             if (bound == task) {
529                 debugTrace(DEBUG_sched,
530                            "### Running thread %lu in bound thread", (unsigned long)t->id);
531                 // yes, the Haskell thread is bound to the current native thread
532             } else {
533                 debugTrace(DEBUG_sched,
534                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 debugTrace(DEBUG_sched,
543                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     /* context switches are initiated by the timer signal, unless
554      * the user specified "context switch as often as possible", with
555      * +RTS -C0
556      */
557     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
558         && !emptyThreadQueues(cap)) {
559         context_switch = 1;
560     }
561          
562 run_thread:
563
564     // CurrentTSO is the thread to run.  t might be different if we
565     // loop back to run_thread, so make sure to set CurrentTSO after
566     // that.
567     cap->r.rCurrentTSO = t;
568
569     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]);
571
572     startHeapProfTimer();
573
574     // Check for exceptions blocked on this thread
575     maybePerformBlockedException (cap, t);
576
577     // ----------------------------------------------------------------------
578     // Run the current thread 
579
580     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
581     ASSERT(t->cap == cap);
582
583     prev_what_next = t->what_next;
584
585     errno = t->saved_errno;
586 #if mingw32_HOST_OS
587     SetLastError(t->saved_winerror);
588 #endif
589
590     cap->in_haskell = rtsTrue;
591
592     dirty_TSO(cap,t);
593
594 #if defined(THREADED_RTS)
595     if (recent_activity == ACTIVITY_DONE_GC) {
596         // ACTIVITY_DONE_GC means we turned off the timer signal to
597         // conserve power (see #1623).  Re-enable it here.
598         nat prev;
599         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
600         if (prev == ACTIVITY_DONE_GC) {
601             startTimer();
602         }
603     } else {
604         recent_activity = ACTIVITY_YES;
605     }
606 #endif
607
608     switch (prev_what_next) {
609         
610     case ThreadKilled:
611     case ThreadComplete:
612         /* Thread already finished, return to scheduler. */
613         ret = ThreadFinished;
614         break;
615         
616     case ThreadRunGHC:
617     {
618         StgRegTable *r;
619         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
620         cap = regTableToCapability(r);
621         ret = r->rRet;
622         break;
623     }
624     
625     case ThreadInterpret:
626         cap = interpretBCO(cap);
627         ret = cap->r.rRet;
628         break;
629         
630     default:
631         barf("schedule: invalid what_next field");
632     }
633
634     cap->in_haskell = rtsFalse;
635
636     // The TSO might have moved, eg. if it re-entered the RTS and a GC
637     // happened.  So find the new location:
638     t = cap->r.rCurrentTSO;
639
640     // We have run some Haskell code: there might be blackhole-blocked
641     // threads to wake up now.
642     // Lock-free test here should be ok, we're just setting a flag.
643     if ( blackhole_queue != END_TSO_QUEUE ) {
644         blackholes_need_checking = rtsTrue;
645     }
646     
647     // And save the current errno in this thread.
648     // XXX: possibly bogus for SMP because this thread might already
649     // be running again, see code below.
650     t->saved_errno = errno;
651 #if mingw32_HOST_OS
652     // Similarly for Windows error code
653     t->saved_winerror = GetLastError();
654 #endif
655
656 #if defined(THREADED_RTS)
657     // If ret is ThreadBlocked, and this Task is bound to the TSO that
658     // blocked, we are in limbo - the TSO is now owned by whatever it
659     // is blocked on, and may in fact already have been woken up,
660     // perhaps even on a different Capability.  It may be the case
661     // that task->cap != cap.  We better yield this Capability
662     // immediately and return to normaility.
663     if (ret == ThreadBlocked) {
664         debugTrace(DEBUG_sched,
665                    "--<< thread %lu (%s) stopped: blocked",
666                    (unsigned long)t->id, whatNext_strs[t->what_next]);
667         continue;
668     }
669 #endif
670
671     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
672     ASSERT(t->cap == cap);
673
674     // ----------------------------------------------------------------------
675     
676     // Costs for the scheduler are assigned to CCS_SYSTEM
677     stopHeapProfTimer();
678 #if defined(PROFILING)
679     CCCS = CCS_SYSTEM;
680 #endif
681     
682     schedulePostRunThread(t);
683
684     t = threadStackUnderflow(task,t);
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718       cap = scheduleDoGC(cap,task,rtsFalse);
719     }
720   } /* end of while() */
721 }
722
723 /* ----------------------------------------------------------------------------
724  * Setting up the scheduler loop
725  * ------------------------------------------------------------------------- */
726
727 static void
728 schedulePreLoop(void)
729 {
730 #if defined(GRAN) 
731     /* set up first event to get things going */
732     /* ToDo: assign costs for system setup and init MainTSO ! */
733     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
734               ContinueThread, 
735               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
736     
737     debugTrace (DEBUG_gran,
738                 "GRAN: Init CurrentTSO (in schedule) = %p", 
739                 CurrentTSO);
740     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
741     
742     if (RtsFlags.GranFlags.Light) {
743         /* Save current time; GranSim Light only */
744         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
745     }      
746 #endif
747 }
748
749 /* -----------------------------------------------------------------------------
750  * schedulePushWork()
751  *
752  * Push work to other Capabilities if we have some.
753  * -------------------------------------------------------------------------- */
754
755 #if defined(THREADED_RTS)
756 static void
757 schedulePushWork(Capability *cap USED_IF_THREADS, 
758                  Task *task      USED_IF_THREADS)
759 {
760     Capability *free_caps[n_capabilities], *cap0;
761     nat i, n_free_caps;
762
763     // migration can be turned off with +RTS -qg
764     if (!RtsFlags.ParFlags.migrate) return;
765
766     // Check whether we have more threads on our run queue, or sparks
767     // in our pool, that we could hand to another Capability.
768     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
769         && sparkPoolSizeCap(cap) < 2) {
770         return;
771     }
772
773     // First grab as many free Capabilities as we can.
774     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
775         cap0 = &capabilities[i];
776         if (cap != cap0 && tryGrabCapability(cap0,task)) {
777             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
778                 // it already has some work, we just grabbed it at 
779                 // the wrong moment.  Or maybe it's deadlocked!
780                 releaseCapability(cap0);
781             } else {
782                 free_caps[n_free_caps++] = cap0;
783             }
784         }
785     }
786
787     // we now have n_free_caps free capabilities stashed in
788     // free_caps[].  Share our run queue equally with them.  This is
789     // probably the simplest thing we could do; improvements we might
790     // want to do include:
791     //
792     //   - giving high priority to moving relatively new threads, on 
793     //     the gournds that they haven't had time to build up a
794     //     working set in the cache on this CPU/Capability.
795     //
796     //   - giving low priority to moving long-lived threads
797
798     if (n_free_caps > 0) {
799         StgTSO *prev, *t, *next;
800         rtsBool pushed_to_all;
801
802         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
803
804         i = 0;
805         pushed_to_all = rtsFalse;
806
807         if (cap->run_queue_hd != END_TSO_QUEUE) {
808             prev = cap->run_queue_hd;
809             t = prev->_link;
810             prev->_link = END_TSO_QUEUE;
811             for (; t != END_TSO_QUEUE; t = next) {
812                 next = t->_link;
813                 t->_link = END_TSO_QUEUE;
814                 if (t->what_next == ThreadRelocated
815                     || t->bound == task // don't move my bound thread
816                     || tsoLocked(t)) {  // don't move a locked thread
817                     setTSOLink(cap, prev, t);
818                     prev = t;
819                 } else if (i == n_free_caps) {
820                     pushed_to_all = rtsTrue;
821                     i = 0;
822                     // keep one for us
823                     setTSOLink(cap, prev, t);
824                     prev = t;
825                 } else {
826                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827                     appendToRunQueue(free_caps[i],t);
828                     if (t->bound) { t->bound->cap = free_caps[i]; }
829                     t->cap = free_caps[i];
830                     i++;
831                 }
832             }
833             cap->run_queue_tl = prev;
834         }
835
836         // If there are some free capabilities that we didn't push any
837         // threads to, then try to push a spark to each one.
838         if (!pushed_to_all) {
839             StgClosure *spark;
840             // i is the next free capability to push to
841             for (; i < n_free_caps; i++) {
842                 if (emptySparkPoolCap(free_caps[i])) {
843                     spark = findSpark(cap);
844                     if (spark != NULL) {
845                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846                         newSpark(&(free_caps[i]->r), spark);
847                     }
848                 }
849             }
850         }
851
852         // release the capabilities
853         for (i = 0; i < n_free_caps; i++) {
854             task->cap = free_caps[i];
855             releaseCapability(free_caps[i]);
856         }
857     }
858     task->cap = cap; // reset to point to our Capability.
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898     }
899 #endif
900 }
901
902
903 /* ----------------------------------------------------------------------------
904  * Check for threads woken up by other Capabilities
905  * ------------------------------------------------------------------------- */
906
907 static void
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 {
910 #if defined(THREADED_RTS)
911     // Any threads that were woken up by other Capabilities get
912     // appended to our run queue.
913     if (!emptyWakeupQueue(cap)) {
914         ACQUIRE_LOCK(&cap->lock);
915         if (emptyRunQueue(cap)) {
916             cap->run_queue_hd = cap->wakeup_queue_hd;
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         } else {
919             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         }
922         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923         RELEASE_LOCK(&cap->lock);
924     }
925 #endif
926 }
927
928 /* ----------------------------------------------------------------------------
929  * Check for threads blocked on BLACKHOLEs that can be woken up
930  * ------------------------------------------------------------------------- */
931 static void
932 scheduleCheckBlackHoles (Capability *cap)
933 {
934     if ( blackholes_need_checking ) // check without the lock first
935     {
936         ACQUIRE_LOCK(&sched_mutex);
937         if ( blackholes_need_checking ) {
938             checkBlackHoles(cap);
939             blackholes_need_checking = rtsFalse;
940         }
941         RELEASE_LOCK(&sched_mutex);
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Detect deadlock conditions and attempt to resolve them.
947  * ------------------------------------------------------------------------- */
948
949 static void
950 scheduleDetectDeadlock (Capability *cap, Task *task)
951 {
952
953 #if defined(PARALLEL_HASKELL)
954     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
955     return;
956 #endif
957
958     /* 
959      * Detect deadlock: when we have no threads to run, there are no
960      * threads blocked, waiting for I/O, or sleeping, and all the
961      * other tasks are waiting for work, we must have a deadlock of
962      * some description.
963      */
964     if ( emptyThreadQueues(cap) )
965     {
966 #if defined(THREADED_RTS)
967         /* 
968          * In the threaded RTS, we only check for deadlock if there
969          * has been no activity in a complete timeslice.  This means
970          * we won't eagerly start a full GC just because we don't have
971          * any threads to run currently.
972          */
973         if (recent_activity != ACTIVITY_INACTIVE) return;
974 #endif
975
976         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
977
978         // Garbage collection can release some new threads due to
979         // either (a) finalizers or (b) threads resurrected because
980         // they are unreachable and will therefore be sent an
981         // exception.  Any threads thus released will be immediately
982         // runnable.
983         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
984
985         recent_activity = ACTIVITY_DONE_GC;
986         // disable timer signals (see #1623)
987         stopTimer();
988         
989         if ( !emptyRunQueue(cap) ) return;
990
991 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992         /* If we have user-installed signal handlers, then wait
993          * for signals to arrive rather then bombing out with a
994          * deadlock.
995          */
996         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
997             debugTrace(DEBUG_sched,
998                        "still deadlocked, waiting for signals...");
999
1000             awaitUserSignals();
1001
1002             if (signals_pending()) {
1003                 startSignalHandlers(cap);
1004             }
1005
1006             // either we have threads to run, or we were interrupted:
1007             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008
1009             return;
1010         }
1011 #endif
1012
1013 #if !defined(THREADED_RTS)
1014         /* Probably a real deadlock.  Send the current main thread the
1015          * Deadlock exception.
1016          */
1017         if (task->tso) {
1018             switch (task->tso->why_blocked) {
1019             case BlockedOnSTM:
1020             case BlockedOnBlackHole:
1021             case BlockedOnException:
1022             case BlockedOnMVar:
1023                 throwToSingleThreaded(cap, task->tso, 
1024                                       (StgClosure *)NonTermination_closure);
1025                 return;
1026             default:
1027                 barf("deadlock: main thread blocked in a strange way");
1028             }
1029         }
1030         return;
1031 #endif
1032     }
1033 }
1034
1035 /* ----------------------------------------------------------------------------
1036  * Process an event (GRAN only)
1037  * ------------------------------------------------------------------------- */
1038
1039 #if defined(GRAN)
1040 static StgTSO *
1041 scheduleProcessEvent(rtsEvent *event)
1042 {
1043     StgTSO *t;
1044
1045     if (RtsFlags.GranFlags.Light)
1046       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1047
1048     /* adjust time based on time-stamp */
1049     if (event->time > CurrentTime[CurrentProc] &&
1050         event->evttype != ContinueThread)
1051       CurrentTime[CurrentProc] = event->time;
1052     
1053     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1054     if (!RtsFlags.GranFlags.Light)
1055       handleIdlePEs();
1056
1057     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1058
1059     /* main event dispatcher in GranSim */
1060     switch (event->evttype) {
1061       /* Should just be continuing execution */
1062     case ContinueThread:
1063       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1064       /* ToDo: check assertion
1065       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1066              run_queue_hd != END_TSO_QUEUE);
1067       */
1068       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1069       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1070           procStatus[CurrentProc]==Fetching) {
1071         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1072               CurrentTSO->id, CurrentTSO, CurrentProc);
1073         goto next_thread;
1074       } 
1075       /* Ignore ContinueThreads for completed threads */
1076       if (CurrentTSO->what_next == ThreadComplete) {
1077         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1078               CurrentTSO->id, CurrentTSO, CurrentProc);
1079         goto next_thread;
1080       } 
1081       /* Ignore ContinueThreads for threads that are being migrated */
1082       if (PROCS(CurrentTSO)==Nowhere) { 
1083         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1084               CurrentTSO->id, CurrentTSO, CurrentProc);
1085         goto next_thread;
1086       }
1087       /* The thread should be at the beginning of the run queue */
1088       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1089         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1090               CurrentTSO->id, CurrentTSO, CurrentProc);
1091         break; // run the thread anyway
1092       }
1093       /*
1094       new_event(proc, proc, CurrentTime[proc],
1095                 FindWork,
1096                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1097       goto next_thread; 
1098       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1099       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1100
1101     case FetchNode:
1102       do_the_fetchnode(event);
1103       goto next_thread;             /* handle next event in event queue  */
1104       
1105     case GlobalBlock:
1106       do_the_globalblock(event);
1107       goto next_thread;             /* handle next event in event queue  */
1108       
1109     case FetchReply:
1110       do_the_fetchreply(event);
1111       goto next_thread;             /* handle next event in event queue  */
1112       
1113     case UnblockThread:   /* Move from the blocked queue to the tail of */
1114       do_the_unblock(event);
1115       goto next_thread;             /* handle next event in event queue  */
1116       
1117     case ResumeThread:  /* Move from the blocked queue to the tail of */
1118       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1119       event->tso->gran.blocktime += 
1120         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1121       do_the_startthread(event);
1122       goto next_thread;             /* handle next event in event queue  */
1123       
1124     case StartThread:
1125       do_the_startthread(event);
1126       goto next_thread;             /* handle next event in event queue  */
1127       
1128     case MoveThread:
1129       do_the_movethread(event);
1130       goto next_thread;             /* handle next event in event queue  */
1131       
1132     case MoveSpark:
1133       do_the_movespark(event);
1134       goto next_thread;             /* handle next event in event queue  */
1135       
1136     case FindWork:
1137       do_the_findwork(event);
1138       goto next_thread;             /* handle next event in event queue  */
1139       
1140     default:
1141       barf("Illegal event type %u\n", event->evttype);
1142     }  /* switch */
1143     
1144     /* This point was scheduler_loop in the old RTS */
1145
1146     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1147
1148     TimeOfLastEvent = CurrentTime[CurrentProc];
1149     TimeOfNextEvent = get_time_of_next_event();
1150     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1151     // CurrentTSO = ThreadQueueHd;
1152
1153     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1154                          TimeOfNextEvent));
1155
1156     if (RtsFlags.GranFlags.Light) 
1157       GranSimLight_leave_system(event, &ActiveTSO); 
1158
1159     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1160
1161     IF_DEBUG(gran, 
1162              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1163
1164     /* in a GranSim setup the TSO stays on the run queue */
1165     t = CurrentTSO;
1166     /* Take a thread from the run queue. */
1167     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1168
1169     IF_DEBUG(gran, 
1170              debugBelch("GRAN: About to run current thread, which is\n");
1171              G_TSO(t,5));
1172
1173     context_switch = 0; // turned on via GranYield, checking events and time slice
1174
1175     IF_DEBUG(gran, 
1176              DumpGranEvent(GR_SCHEDULE, t));
1177
1178     procStatus[CurrentProc] = Busy;
1179 }
1180 #endif // GRAN
1181
1182 /* ----------------------------------------------------------------------------
1183  * Send pending messages (PARALLEL_HASKELL only)
1184  * ------------------------------------------------------------------------- */
1185
1186 #if defined(PARALLEL_HASKELL)
1187 static StgTSO *
1188 scheduleSendPendingMessages(void)
1189 {
1190     StgSparkPool *pool;
1191     rtsSpark spark;
1192     StgTSO *t;
1193
1194 # if defined(PAR) // global Mem.Mgmt., omit for now
1195     if (PendingFetches != END_BF_QUEUE) {
1196         processFetches();
1197     }
1198 # endif
1199     
1200     if (RtsFlags.ParFlags.BufferTime) {
1201         // if we use message buffering, we must send away all message
1202         // packets which have become too old...
1203         sendOldBuffers(); 
1204     }
1205 }
1206 #endif
1207
1208 /* ----------------------------------------------------------------------------
1209  * Activate spark threads (PARALLEL_HASKELL only)
1210  * ------------------------------------------------------------------------- */
1211
1212 #if defined(PARALLEL_HASKELL)
1213 static void
1214 scheduleActivateSpark(void)
1215 {
1216 #if defined(SPARKS)
1217   ASSERT(emptyRunQueue());
1218 /* We get here if the run queue is empty and want some work.
1219    We try to turn a spark into a thread, and add it to the run queue,
1220    from where it will be picked up in the next iteration of the scheduler
1221    loop.
1222 */
1223
1224       /* :-[  no local threads => look out for local sparks */
1225       /* the spark pool for the current PE */
1226       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1227       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1228           pool->hd < pool->tl) {
1229         /* 
1230          * ToDo: add GC code check that we really have enough heap afterwards!!
1231          * Old comment:
1232          * If we're here (no runnable threads) and we have pending
1233          * sparks, we must have a space problem.  Get enough space
1234          * to turn one of those pending sparks into a
1235          * thread... 
1236          */
1237
1238         spark = findSpark(rtsFalse);            /* get a spark */
1239         if (spark != (rtsSpark) NULL) {
1240           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1241           IF_PAR_DEBUG(fish, // schedule,
1242                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1243                              tso->id, tso, advisory_thread_count));
1244
1245           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1246             IF_PAR_DEBUG(fish, // schedule,
1247                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1248                             spark));
1249             return rtsFalse; /* failed to generate a thread */
1250           }                  /* otherwise fall through & pick-up new tso */
1251         } else {
1252           IF_PAR_DEBUG(fish, // schedule,
1253                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1254                              spark_queue_len(pool)));
1255           return rtsFalse;  /* failed to generate a thread */
1256         }
1257         return rtsTrue;  /* success in generating a thread */
1258   } else { /* no more threads permitted or pool empty */
1259     return rtsFalse;  /* failed to generateThread */
1260   }
1261 #else
1262   tso = NULL; // avoid compiler warning only
1263   return rtsFalse;  /* dummy in non-PAR setup */
1264 #endif // SPARKS
1265 }
1266 #endif // PARALLEL_HASKELL
1267
1268 /* ----------------------------------------------------------------------------
1269  * Get work from a remote node (PARALLEL_HASKELL only)
1270  * ------------------------------------------------------------------------- */
1271     
1272 #if defined(PARALLEL_HASKELL)
1273 static rtsBool
1274 scheduleGetRemoteWork(rtsBool *receivedFinish)
1275 {
1276   ASSERT(emptyRunQueue());
1277
1278   if (RtsFlags.ParFlags.BufferTime) {
1279         IF_PAR_DEBUG(verbose, 
1280                 debugBelch("...send all pending data,"));
1281         {
1282           nat i;
1283           for (i=1; i<=nPEs; i++)
1284             sendImmediately(i); // send all messages away immediately
1285         }
1286   }
1287 # ifndef SPARKS
1288         //++EDEN++ idle() , i.e. send all buffers, wait for work
1289         // suppress fishing in EDEN... just look for incoming messages
1290         // (blocking receive)
1291   IF_PAR_DEBUG(verbose, 
1292                debugBelch("...wait for incoming messages...\n"));
1293   *receivedFinish = processMessages(); // blocking receive...
1294
1295         // and reenter scheduling loop after having received something
1296         // (return rtsFalse below)
1297
1298 # else /* activate SPARKS machinery */
1299 /* We get here, if we have no work, tried to activate a local spark, but still
1300    have no work. We try to get a remote spark, by sending a FISH message.
1301    Thread migration should be added here, and triggered when a sequence of 
1302    fishes returns without work. */
1303         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1304
1305       /* =8-[  no local sparks => look for work on other PEs */
1306         /*
1307          * We really have absolutely no work.  Send out a fish
1308          * (there may be some out there already), and wait for
1309          * something to arrive.  We clearly can't run any threads
1310          * until a SCHEDULE or RESUME arrives, and so that's what
1311          * we're hoping to see.  (Of course, we still have to
1312          * respond to other types of messages.)
1313          */
1314         rtsTime now = msTime() /*CURRENT_TIME*/;
1315         IF_PAR_DEBUG(verbose, 
1316                      debugBelch("--  now=%ld\n", now));
1317         IF_PAR_DEBUG(fish, // verbose,
1318              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1319                  (last_fish_arrived_at!=0 &&
1320                   last_fish_arrived_at+delay > now)) {
1321                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1322                      now, last_fish_arrived_at+delay, 
1323                      last_fish_arrived_at,
1324                      delay);
1325              });
1326   
1327         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1328             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1329           if (last_fish_arrived_at==0 ||
1330               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1331             /* outstandingFishes is set in sendFish, processFish;
1332                avoid flooding system with fishes via delay */
1333     next_fish_to_send_at = 0;  
1334   } else {
1335     /* ToDo: this should be done in the main scheduling loop to avoid the
1336              busy wait here; not so bad if fish delay is very small  */
1337     int iq = 0; // DEBUGGING -- HWL
1338     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1339     /* send a fish when ready, but process messages that arrive in the meantime */
1340     do {
1341       if (PacketsWaiting()) {
1342         iq++; // DEBUGGING
1343         *receivedFinish = processMessages();
1344       }
1345       now = msTime();
1346     } while (!*receivedFinish || now<next_fish_to_send_at);
1347     // JB: This means the fish could become obsolete, if we receive
1348     // work. Better check for work again? 
1349     // last line: while (!receivedFinish || !haveWork || now<...)
1350     // next line: if (receivedFinish || haveWork )
1351
1352     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1353       return rtsFalse;  // NB: this will leave scheduler loop
1354                         // immediately after return!
1355                           
1356     IF_PAR_DEBUG(fish, // verbose,
1357                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1358
1359   }
1360
1361     // JB: IMHO, this should all be hidden inside sendFish(...)
1362     /* pe = choosePE(); 
1363        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1364                 NEW_FISH_HUNGER);
1365
1366     // Global statistics: count no. of fishes
1367     if (RtsFlags.ParFlags.ParStats.Global &&
1368          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1369            globalParStats.tot_fish_mess++;
1370            }
1371     */ 
1372
1373   /* delayed fishes must have been sent by now! */
1374   next_fish_to_send_at = 0;  
1375   }
1376       
1377   *receivedFinish = processMessages();
1378 # endif /* SPARKS */
1379
1380  return rtsFalse;
1381  /* NB: this function always returns rtsFalse, meaning the scheduler
1382     loop continues with the next iteration; 
1383     rationale: 
1384       return code means success in finding work; we enter this function
1385       if there is no local work, thus have to send a fish which takes
1386       time until it arrives with work; in the meantime we should process
1387       messages in the main loop;
1388  */
1389 }
1390 #endif // PARALLEL_HASKELL
1391
1392 /* ----------------------------------------------------------------------------
1393  * PAR/GRAN: Report stats & debugging info(?)
1394  * ------------------------------------------------------------------------- */
1395
1396 #if defined(PAR) || defined(GRAN)
1397 static void
1398 scheduleGranParReport(void)
1399 {
1400   ASSERT(run_queue_hd != END_TSO_QUEUE);
1401
1402   /* Take a thread from the run queue, if we have work */
1403   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1404
1405     /* If this TSO has got its outport closed in the meantime, 
1406      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1407      * It has to be marked as TH_DEAD for this purpose.
1408      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1409
1410 JB: TODO: investigate wether state change field could be nuked
1411      entirely and replaced by the normal tso state (whatnext
1412      field). All we want to do is to kill tsos from outside.
1413      */
1414
1415     /* ToDo: write something to the log-file
1416     if (RTSflags.ParFlags.granSimStats && !sameThread)
1417         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1418
1419     CurrentTSO = t;
1420     */
1421     /* the spark pool for the current PE */
1422     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1423
1424     IF_DEBUG(scheduler, 
1425              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1426                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1427
1428     IF_PAR_DEBUG(fish,
1429              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1430                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1431
1432     if (RtsFlags.ParFlags.ParStats.Full && 
1433         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1434         (emitSchedule || // forced emit
1435          (t && LastTSO && t->id != LastTSO->id))) {
1436       /* 
1437          we are running a different TSO, so write a schedule event to log file
1438          NB: If we use fair scheduling we also have to write  a deschedule 
1439              event for LastTSO; with unfair scheduling we know that the
1440              previous tso has blocked whenever we switch to another tso, so
1441              we don't need it in GUM for now
1442       */
1443       IF_PAR_DEBUG(fish, // schedule,
1444                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1445
1446       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1447                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1448       emitSchedule = rtsFalse;
1449     }
1450 }     
1451 #endif
1452
1453 /* ----------------------------------------------------------------------------
1454  * After running a thread...
1455  * ------------------------------------------------------------------------- */
1456
1457 static void
1458 schedulePostRunThread (StgTSO *t)
1459 {
1460     // We have to be able to catch transactions that are in an
1461     // infinite loop as a result of seeing an inconsistent view of
1462     // memory, e.g. 
1463     //
1464     //   atomically $ do
1465     //       [a,b] <- mapM readTVar [ta,tb]
1466     //       when (a == b) loop
1467     //
1468     // and a is never equal to b given a consistent view of memory.
1469     //
1470     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1471         if (!stmValidateNestOfTransactions (t -> trec)) {
1472             debugTrace(DEBUG_sched | DEBUG_stm,
1473                        "trec %p found wasting its time", t);
1474             
1475             // strip the stack back to the
1476             // ATOMICALLY_FRAME, aborting the (nested)
1477             // transaction, and saving the stack of any
1478             // partially-evaluated thunks on the heap.
1479             throwToSingleThreaded_(&capabilities[0], t, 
1480                                    NULL, rtsTrue, NULL);
1481             
1482 #ifdef REG_R1
1483             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1484 #endif
1485         }
1486     }
1487
1488 #if defined(PAR)
1489     /* HACK 675: if the last thread didn't yield, make sure to print a 
1490        SCHEDULE event to the log file when StgRunning the next thread, even
1491        if it is the same one as before */
1492     LastTSO = t; 
1493     TimeOfLastYield = CURRENT_TIME;
1494 #endif
1495
1496   /* some statistics gathering in the parallel case */
1497
1498 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1499   switch (ret) {
1500     case HeapOverflow:
1501 # if defined(GRAN)
1502       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1503       globalGranStats.tot_heapover++;
1504 # elif defined(PAR)
1505       globalParStats.tot_heapover++;
1506 # endif
1507       break;
1508
1509      case StackOverflow:
1510 # if defined(GRAN)
1511       IF_DEBUG(gran, 
1512                DumpGranEvent(GR_DESCHEDULE, t));
1513       globalGranStats.tot_stackover++;
1514 # elif defined(PAR)
1515       // IF_DEBUG(par, 
1516       // DumpGranEvent(GR_DESCHEDULE, t);
1517       globalParStats.tot_stackover++;
1518 # endif
1519       break;
1520
1521     case ThreadYielding:
1522 # if defined(GRAN)
1523       IF_DEBUG(gran, 
1524                DumpGranEvent(GR_DESCHEDULE, t));
1525       globalGranStats.tot_yields++;
1526 # elif defined(PAR)
1527       // IF_DEBUG(par, 
1528       // DumpGranEvent(GR_DESCHEDULE, t);
1529       globalParStats.tot_yields++;
1530 # endif
1531       break; 
1532
1533     case ThreadBlocked:
1534 # if defined(GRAN)
1535         debugTrace(DEBUG_sched, 
1536                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1537                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1538                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1539                if (t->block_info.closure!=(StgClosure*)NULL)
1540                  print_bq(t->block_info.closure);
1541                debugBelch("\n"));
1542
1543       // ??? needed; should emit block before
1544       IF_DEBUG(gran, 
1545                DumpGranEvent(GR_DESCHEDULE, t)); 
1546       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1547       /*
1548         ngoq Dogh!
1549       ASSERT(procStatus[CurrentProc]==Busy || 
1550               ((procStatus[CurrentProc]==Fetching) && 
1551               (t->block_info.closure!=(StgClosure*)NULL)));
1552       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1553           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1554             procStatus[CurrentProc]==Fetching)) 
1555         procStatus[CurrentProc] = Idle;
1556       */
1557 # elif defined(PAR)
1558 //++PAR++  blockThread() writes the event (change?)
1559 # endif
1560     break;
1561
1562   case ThreadFinished:
1563     break;
1564
1565   default:
1566     barf("parGlobalStats: unknown return code");
1567     break;
1568     }
1569 #endif
1570 }
1571
1572 /* -----------------------------------------------------------------------------
1573  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1574  * -------------------------------------------------------------------------- */
1575
1576 static rtsBool
1577 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1578 {
1579     // did the task ask for a large block?
1580     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1581         // if so, get one and push it on the front of the nursery.
1582         bdescr *bd;
1583         lnat blocks;
1584         
1585         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1586         
1587         debugTrace(DEBUG_sched,
1588                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1589                    (long)t->id, whatNext_strs[t->what_next], blocks);
1590     
1591         // don't do this if the nursery is (nearly) full, we'll GC first.
1592         if (cap->r.rCurrentNursery->link != NULL ||
1593             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1594                                                // if the nursery has only one block.
1595             
1596             ACQUIRE_SM_LOCK
1597             bd = allocGroup( blocks );
1598             RELEASE_SM_LOCK
1599             cap->r.rNursery->n_blocks += blocks;
1600             
1601             // link the new group into the list
1602             bd->link = cap->r.rCurrentNursery;
1603             bd->u.back = cap->r.rCurrentNursery->u.back;
1604             if (cap->r.rCurrentNursery->u.back != NULL) {
1605                 cap->r.rCurrentNursery->u.back->link = bd;
1606             } else {
1607 #if !defined(THREADED_RTS)
1608                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1609                        g0s0 == cap->r.rNursery);
1610 #endif
1611                 cap->r.rNursery->blocks = bd;
1612             }             
1613             cap->r.rCurrentNursery->u.back = bd;
1614             
1615             // initialise it as a nursery block.  We initialise the
1616             // step, gen_no, and flags field of *every* sub-block in
1617             // this large block, because this is easier than making
1618             // sure that we always find the block head of a large
1619             // block whenever we call Bdescr() (eg. evacuate() and
1620             // isAlive() in the GC would both have to do this, at
1621             // least).
1622             { 
1623                 bdescr *x;
1624                 for (x = bd; x < bd + blocks; x++) {
1625                     x->step = cap->r.rNursery;
1626                     x->gen_no = 0;
1627                     x->flags = 0;
1628                 }
1629             }
1630             
1631             // This assert can be a killer if the app is doing lots
1632             // of large block allocations.
1633             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1634             
1635             // now update the nursery to point to the new block
1636             cap->r.rCurrentNursery = bd;
1637             
1638             // we might be unlucky and have another thread get on the
1639             // run queue before us and steal the large block, but in that
1640             // case the thread will just end up requesting another large
1641             // block.
1642             pushOnRunQueue(cap,t);
1643             return rtsFalse;  /* not actually GC'ing */
1644         }
1645     }
1646     
1647     debugTrace(DEBUG_sched,
1648                "--<< thread %ld (%s) stopped: HeapOverflow",
1649                (long)t->id, whatNext_strs[t->what_next]);
1650
1651 #if defined(GRAN)
1652     ASSERT(!is_on_queue(t,CurrentProc));
1653 #elif defined(PARALLEL_HASKELL)
1654     /* Currently we emit a DESCHEDULE event before GC in GUM.
1655        ToDo: either add separate event to distinguish SYSTEM time from rest
1656        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1657     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1658         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1659                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1660         emitSchedule = rtsTrue;
1661     }
1662 #endif
1663       
1664     if (context_switch) {
1665         // Sometimes we miss a context switch, e.g. when calling
1666         // primitives in a tight loop, MAYBE_GC() doesn't check the
1667         // context switch flag, and we end up waiting for a GC.
1668         // See #1984, and concurrent/should_run/1984
1669         context_switch = 0;
1670         addToRunQueue(cap,t);
1671     } else {
1672         pushOnRunQueue(cap,t);
1673     }
1674     return rtsTrue;
1675     /* actual GC is done at the end of the while loop in schedule() */
1676 }
1677
1678 /* -----------------------------------------------------------------------------
1679  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1680  * -------------------------------------------------------------------------- */
1681
1682 static void
1683 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1684 {
1685     debugTrace (DEBUG_sched,
1686                 "--<< thread %ld (%s) stopped, StackOverflow", 
1687                 (long)t->id, whatNext_strs[t->what_next]);
1688
1689     /* just adjust the stack for this thread, then pop it back
1690      * on the run queue.
1691      */
1692     { 
1693         /* enlarge the stack */
1694         StgTSO *new_t = threadStackOverflow(cap, t);
1695         
1696         /* The TSO attached to this Task may have moved, so update the
1697          * pointer to it.
1698          */
1699         if (task->tso == t) {
1700             task->tso = new_t;
1701         }
1702         pushOnRunQueue(cap,new_t);
1703     }
1704 }
1705
1706 /* -----------------------------------------------------------------------------
1707  * Handle a thread that returned to the scheduler with ThreadYielding
1708  * -------------------------------------------------------------------------- */
1709
1710 static rtsBool
1711 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1712 {
1713     // Reset the context switch flag.  We don't do this just before
1714     // running the thread, because that would mean we would lose ticks
1715     // during GC, which can lead to unfair scheduling (a thread hogs
1716     // the CPU because the tick always arrives during GC).  This way
1717     // penalises threads that do a lot of allocation, but that seems
1718     // better than the alternative.
1719     context_switch = 0;
1720     
1721     /* put the thread back on the run queue.  Then, if we're ready to
1722      * GC, check whether this is the last task to stop.  If so, wake
1723      * up the GC thread.  getThread will block during a GC until the
1724      * GC is finished.
1725      */
1726 #ifdef DEBUG
1727     if (t->what_next != prev_what_next) {
1728         debugTrace(DEBUG_sched,
1729                    "--<< thread %ld (%s) stopped to switch evaluators", 
1730                    (long)t->id, whatNext_strs[t->what_next]);
1731     } else {
1732         debugTrace(DEBUG_sched,
1733                    "--<< thread %ld (%s) stopped, yielding",
1734                    (long)t->id, whatNext_strs[t->what_next]);
1735     }
1736 #endif
1737     
1738     IF_DEBUG(sanity,
1739              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1740              checkTSO(t));
1741     ASSERT(t->_link == END_TSO_QUEUE);
1742     
1743     // Shortcut if we're just switching evaluators: don't bother
1744     // doing stack squeezing (which can be expensive), just run the
1745     // thread.
1746     if (t->what_next != prev_what_next) {
1747         return rtsTrue;
1748     }
1749     
1750 #if defined(GRAN)
1751     ASSERT(!is_on_queue(t,CurrentProc));
1752       
1753     IF_DEBUG(sanity,
1754              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1755              checkThreadQsSanity(rtsTrue));
1756
1757 #endif
1758
1759     addToRunQueue(cap,t);
1760
1761 #if defined(GRAN)
1762     /* add a ContinueThread event to actually process the thread */
1763     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1764               ContinueThread,
1765               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1766     IF_GRAN_DEBUG(bq, 
1767                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1768                   G_EVENTQ(0);
1769                   G_CURR_THREADQ(0));
1770 #endif
1771     return rtsFalse;
1772 }
1773
1774 /* -----------------------------------------------------------------------------
1775  * Handle a thread that returned to the scheduler with ThreadBlocked
1776  * -------------------------------------------------------------------------- */
1777
1778 static void
1779 scheduleHandleThreadBlocked( StgTSO *t
1780 #if !defined(GRAN) && !defined(DEBUG)
1781     STG_UNUSED
1782 #endif
1783     )
1784 {
1785 #if defined(GRAN)
1786     IF_DEBUG(scheduler,
1787              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1788                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1789              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1790     
1791     // ??? needed; should emit block before
1792     IF_DEBUG(gran, 
1793              DumpGranEvent(GR_DESCHEDULE, t)); 
1794     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1795     /*
1796       ngoq Dogh!
1797       ASSERT(procStatus[CurrentProc]==Busy || 
1798       ((procStatus[CurrentProc]==Fetching) && 
1799       (t->block_info.closure!=(StgClosure*)NULL)));
1800       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1801       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1802       procStatus[CurrentProc]==Fetching)) 
1803       procStatus[CurrentProc] = Idle;
1804     */
1805 #elif defined(PAR)
1806     IF_DEBUG(scheduler,
1807              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1808                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1809     IF_PAR_DEBUG(bq,
1810                  
1811                  if (t->block_info.closure!=(StgClosure*)NULL) 
1812                  print_bq(t->block_info.closure));
1813     
1814     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1815     blockThread(t);
1816     
1817     /* whatever we schedule next, we must log that schedule */
1818     emitSchedule = rtsTrue;
1819     
1820 #else /* !GRAN */
1821
1822       // We don't need to do anything.  The thread is blocked, and it
1823       // has tidied up its stack and placed itself on whatever queue
1824       // it needs to be on.
1825
1826     // ASSERT(t->why_blocked != NotBlocked);
1827     // Not true: for example,
1828     //    - in THREADED_RTS, the thread may already have been woken
1829     //      up by another Capability.  This actually happens: try
1830     //      conc023 +RTS -N2.
1831     //    - the thread may have woken itself up already, because
1832     //      threadPaused() might have raised a blocked throwTo
1833     //      exception, see maybePerformBlockedException().
1834
1835 #ifdef DEBUG
1836     if (traceClass(DEBUG_sched)) {
1837         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1838                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1839         printThreadBlockage(t);
1840         debugTraceEnd();
1841     }
1842 #endif
1843     
1844     /* Only for dumping event to log file 
1845        ToDo: do I need this in GranSim, too?
1846        blockThread(t);
1847     */
1848 #endif
1849 }
1850
1851 /* -----------------------------------------------------------------------------
1852  * Handle a thread that returned to the scheduler with ThreadFinished
1853  * -------------------------------------------------------------------------- */
1854
1855 static rtsBool
1856 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1857 {
1858     /* Need to check whether this was a main thread, and if so,
1859      * return with the return value.
1860      *
1861      * We also end up here if the thread kills itself with an
1862      * uncaught exception, see Exception.cmm.
1863      */
1864     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1865                (unsigned long)t->id, whatNext_strs[t->what_next]);
1866
1867 #if defined(GRAN)
1868       endThread(t, CurrentProc); // clean-up the thread
1869 #elif defined(PARALLEL_HASKELL)
1870       /* For now all are advisory -- HWL */
1871       //if(t->priority==AdvisoryPriority) ??
1872       advisory_thread_count--; // JB: Caution with this counter, buggy!
1873       
1874 # if defined(DIST)
1875       if(t->dist.priority==RevalPriority)
1876         FinishReval(t);
1877 # endif
1878     
1879 # if defined(EDENOLD)
1880       // the thread could still have an outport... (BUG)
1881       if (t->eden.outport != -1) {
1882       // delete the outport for the tso which has finished...
1883         IF_PAR_DEBUG(eden_ports,
1884                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1885                               t->eden.outport, t->id));
1886         deleteOPT(t);
1887       }
1888       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1889       if (t->eden.epid != -1) {
1890         IF_PAR_DEBUG(eden_ports,
1891                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1892                            t->id, t->eden.epid));
1893         removeTSOfromProcess(t);
1894       }
1895 # endif 
1896
1897 # if defined(PAR)
1898       if (RtsFlags.ParFlags.ParStats.Full &&
1899           !RtsFlags.ParFlags.ParStats.Suppressed) 
1900         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1901
1902       //  t->par only contains statistics: left out for now...
1903       IF_PAR_DEBUG(fish,
1904                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1905                               t->id,t,t->par.sparkname));
1906 # endif
1907 #endif // PARALLEL_HASKELL
1908
1909       //
1910       // Check whether the thread that just completed was a bound
1911       // thread, and if so return with the result.  
1912       //
1913       // There is an assumption here that all thread completion goes
1914       // through this point; we need to make sure that if a thread
1915       // ends up in the ThreadKilled state, that it stays on the run
1916       // queue so it can be dealt with here.
1917       //
1918
1919       if (t->bound) {
1920
1921           if (t->bound != task) {
1922 #if !defined(THREADED_RTS)
1923               // Must be a bound thread that is not the topmost one.  Leave
1924               // it on the run queue until the stack has unwound to the
1925               // point where we can deal with this.  Leaving it on the run
1926               // queue also ensures that the garbage collector knows about
1927               // this thread and its return value (it gets dropped from the
1928               // step->threads list so there's no other way to find it).
1929               appendToRunQueue(cap,t);
1930               return rtsFalse;
1931 #else
1932               // this cannot happen in the threaded RTS, because a
1933               // bound thread can only be run by the appropriate Task.
1934               barf("finished bound thread that isn't mine");
1935 #endif
1936           }
1937
1938           ASSERT(task->tso == t);
1939
1940           if (t->what_next == ThreadComplete) {
1941               if (task->ret) {
1942                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1943                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1944               }
1945               task->stat = Success;
1946           } else {
1947               if (task->ret) {
1948                   *(task->ret) = NULL;
1949               }
1950               if (sched_state >= SCHED_INTERRUPTING) {
1951                   task->stat = Interrupted;
1952               } else {
1953                   task->stat = Killed;
1954               }
1955           }
1956 #ifdef DEBUG
1957           removeThreadLabel((StgWord)task->tso->id);
1958 #endif
1959           return rtsTrue; // tells schedule() to return
1960       }
1961
1962       return rtsFalse;
1963 }
1964
1965 /* -----------------------------------------------------------------------------
1966  * Perform a heap census
1967  * -------------------------------------------------------------------------- */
1968
1969 static rtsBool
1970 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1971 {
1972     // When we have +RTS -i0 and we're heap profiling, do a census at
1973     // every GC.  This lets us get repeatable runs for debugging.
1974     if (performHeapProfile ||
1975         (RtsFlags.ProfFlags.profileInterval==0 &&
1976          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1977         return rtsTrue;
1978     } else {
1979         return rtsFalse;
1980     }
1981 }
1982
1983 /* -----------------------------------------------------------------------------
1984  * Perform a garbage collection if necessary
1985  * -------------------------------------------------------------------------- */
1986
1987 static Capability *
1988 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1989 {
1990     StgTSO *t;
1991     rtsBool heap_census;
1992 #ifdef THREADED_RTS
1993     static volatile StgWord waiting_for_gc;
1994     rtsBool was_waiting;
1995     nat i;
1996 #endif
1997
1998 #ifdef THREADED_RTS
1999     // In order to GC, there must be no threads running Haskell code.
2000     // Therefore, the GC thread needs to hold *all* the capabilities,
2001     // and release them after the GC has completed.  
2002     //
2003     // This seems to be the simplest way: previous attempts involved
2004     // making all the threads with capabilities give up their
2005     // capabilities and sleep except for the *last* one, which
2006     // actually did the GC.  But it's quite hard to arrange for all
2007     // the other tasks to sleep and stay asleep.
2008     //
2009         
2010     was_waiting = cas(&waiting_for_gc, 0, 1);
2011     if (was_waiting) {
2012         do {
2013             debugTrace(DEBUG_sched, "someone else is trying to GC...");
2014             if (cap) yieldCapability(&cap,task);
2015         } while (waiting_for_gc);
2016         return cap;  // NOTE: task->cap might have changed here
2017     }
2018
2019     for (i=0; i < n_capabilities; i++) {
2020         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
2021         if (cap != &capabilities[i]) {
2022             Capability *pcap = &capabilities[i];
2023             // we better hope this task doesn't get migrated to
2024             // another Capability while we're waiting for this one.
2025             // It won't, because load balancing happens while we have
2026             // all the Capabilities, but even so it's a slightly
2027             // unsavoury invariant.
2028             task->cap = pcap;
2029             context_switch = 1;
2030             waitForReturnCapability(&pcap, task);
2031             if (pcap != &capabilities[i]) {
2032                 barf("scheduleDoGC: got the wrong capability");
2033             }
2034         }
2035     }
2036
2037     waiting_for_gc = rtsFalse;
2038 #endif
2039
2040     // so this happens periodically:
2041     if (cap) scheduleCheckBlackHoles(cap);
2042     
2043     IF_DEBUG(scheduler, printAllThreads());
2044
2045     /*
2046      * We now have all the capabilities; if we're in an interrupting
2047      * state, then we should take the opportunity to delete all the
2048      * threads in the system.
2049      */
2050     if (sched_state >= SCHED_INTERRUPTING) {
2051         deleteAllThreads(&capabilities[0]);
2052         sched_state = SCHED_SHUTTING_DOWN;
2053     }
2054     
2055     heap_census = scheduleNeedHeapProfile(rtsTrue);
2056
2057     /* everybody back, start the GC.
2058      * Could do it in this thread, or signal a condition var
2059      * to do it in another thread.  Either way, we need to
2060      * broadcast on gc_pending_cond afterward.
2061      */
2062 #if defined(THREADED_RTS)
2063     debugTrace(DEBUG_sched, "doing GC");
2064 #endif
2065     GarbageCollect(force_major || heap_census);
2066     
2067     if (heap_census) {
2068         debugTrace(DEBUG_sched, "performing heap census");
2069         heapCensus();
2070         performHeapProfile = rtsFalse;
2071     }
2072
2073 #if defined(THREADED_RTS)
2074     // release our stash of capabilities.
2075     for (i = 0; i < n_capabilities; i++) {
2076         if (cap != &capabilities[i]) {
2077             task->cap = &capabilities[i];
2078             releaseCapability(&capabilities[i]);
2079         }
2080     }
2081     if (cap) {
2082         task->cap = cap;
2083     } else {
2084         task->cap = NULL;
2085     }
2086 #endif
2087
2088 #if defined(GRAN)
2089     /* add a ContinueThread event to continue execution of current thread */
2090     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2091               ContinueThread,
2092               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2093     IF_GRAN_DEBUG(bq, 
2094                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2095                   G_EVENTQ(0);
2096                   G_CURR_THREADQ(0));
2097 #endif /* GRAN */
2098
2099     return cap;
2100 }
2101
2102 /* ---------------------------------------------------------------------------
2103  * Singleton fork(). Do not copy any running threads.
2104  * ------------------------------------------------------------------------- */
2105
2106 pid_t
2107 forkProcess(HsStablePtr *entry
2108 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2109             STG_UNUSED
2110 #endif
2111            )
2112 {
2113 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2114     Task *task;
2115     pid_t pid;
2116     StgTSO* t,*next;
2117     Capability *cap;
2118     nat s;
2119     
2120 #if defined(THREADED_RTS)
2121     if (RtsFlags.ParFlags.nNodes > 1) {
2122         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2123         stg_exit(EXIT_FAILURE);
2124     }
2125 #endif
2126
2127     debugTrace(DEBUG_sched, "forking!");
2128     
2129     // ToDo: for SMP, we should probably acquire *all* the capabilities
2130     cap = rts_lock();
2131     
2132     // no funny business: hold locks while we fork, otherwise if some
2133     // other thread is holding a lock when the fork happens, the data
2134     // structure protected by the lock will forever be in an
2135     // inconsistent state in the child.  See also #1391.
2136     ACQUIRE_LOCK(&sched_mutex);
2137     ACQUIRE_LOCK(&cap->lock);
2138     ACQUIRE_LOCK(&cap->running_task->lock);
2139
2140     pid = fork();
2141     
2142     if (pid) { // parent
2143         
2144         RELEASE_LOCK(&sched_mutex);
2145         RELEASE_LOCK(&cap->lock);
2146         RELEASE_LOCK(&cap->running_task->lock);
2147
2148         // just return the pid
2149         rts_unlock(cap);
2150         return pid;
2151         
2152     } else { // child
2153         
2154 #if defined(THREADED_RTS)
2155         initMutex(&sched_mutex);
2156         initMutex(&cap->lock);
2157         initMutex(&cap->running_task->lock);
2158 #endif
2159
2160         // Now, all OS threads except the thread that forked are
2161         // stopped.  We need to stop all Haskell threads, including
2162         // those involved in foreign calls.  Also we need to delete
2163         // all Tasks, because they correspond to OS threads that are
2164         // now gone.
2165
2166         for (s = 0; s < total_steps; s++) {
2167           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2168             if (t->what_next == ThreadRelocated) {
2169                 next = t->_link;
2170             } else {
2171                 next = t->global_link;
2172                 // don't allow threads to catch the ThreadKilled
2173                 // exception, but we do want to raiseAsync() because these
2174                 // threads may be evaluating thunks that we need later.
2175                 deleteThread_(cap,t);
2176             }
2177           }
2178         }
2179         
2180         // Empty the run queue.  It seems tempting to let all the
2181         // killed threads stay on the run queue as zombies to be
2182         // cleaned up later, but some of them correspond to bound
2183         // threads for which the corresponding Task does not exist.
2184         cap->run_queue_hd = END_TSO_QUEUE;
2185         cap->run_queue_tl = END_TSO_QUEUE;
2186
2187         // Any suspended C-calling Tasks are no more, their OS threads
2188         // don't exist now:
2189         cap->suspended_ccalling_tasks = NULL;
2190
2191         // Empty the threads lists.  Otherwise, the garbage
2192         // collector may attempt to resurrect some of these threads.
2193         for (s = 0; s < total_steps; s++) {
2194             all_steps[s].threads = END_TSO_QUEUE;
2195         }
2196
2197         // Wipe the task list, except the current Task.
2198         ACQUIRE_LOCK(&sched_mutex);
2199         for (task = all_tasks; task != NULL; task=task->all_link) {
2200             if (task != cap->running_task) {
2201 #if defined(THREADED_RTS)
2202                 initMutex(&task->lock); // see #1391
2203 #endif
2204                 discardTask(task);
2205             }
2206         }
2207         RELEASE_LOCK(&sched_mutex);
2208
2209 #if defined(THREADED_RTS)
2210         // Wipe our spare workers list, they no longer exist.  New
2211         // workers will be created if necessary.
2212         cap->spare_workers = NULL;
2213         cap->returning_tasks_hd = NULL;
2214         cap->returning_tasks_tl = NULL;
2215 #endif
2216
2217         // On Unix, all timers are reset in the child, so we need to start
2218         // the timer again.
2219         initTimer();
2220         startTimer();
2221
2222         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2223         rts_checkSchedStatus("forkProcess",cap);
2224         
2225         rts_unlock(cap);
2226         hs_exit();                      // clean up and exit
2227         stg_exit(EXIT_SUCCESS);
2228     }
2229 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2230     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2231     return -1;
2232 #endif
2233 }
2234
2235 /* ---------------------------------------------------------------------------
2236  * Delete all the threads in the system
2237  * ------------------------------------------------------------------------- */
2238    
2239 static void
2240 deleteAllThreads ( Capability *cap )
2241 {
2242     // NOTE: only safe to call if we own all capabilities.
2243
2244     StgTSO* t, *next;
2245     nat s;
2246
2247     debugTrace(DEBUG_sched,"deleting all threads");
2248     for (s = 0; s < total_steps; s++) {
2249       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2250         if (t->what_next == ThreadRelocated) {
2251             next = t->_link;
2252         } else {
2253             next = t->global_link;
2254             deleteThread(cap,t);
2255         }
2256       }
2257     }      
2258
2259     // The run queue now contains a bunch of ThreadKilled threads.  We
2260     // must not throw these away: the main thread(s) will be in there
2261     // somewhere, and the main scheduler loop has to deal with it.
2262     // Also, the run queue is the only thing keeping these threads from
2263     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2264
2265 #if !defined(THREADED_RTS)
2266     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2267     ASSERT(sleeping_queue == END_TSO_QUEUE);
2268 #endif
2269 }
2270
2271 /* -----------------------------------------------------------------------------
2272    Managing the suspended_ccalling_tasks list.
2273    Locks required: sched_mutex
2274    -------------------------------------------------------------------------- */
2275
2276 STATIC_INLINE void
2277 suspendTask (Capability *cap, Task *task)
2278 {
2279     ASSERT(task->next == NULL && task->prev == NULL);
2280     task->next = cap->suspended_ccalling_tasks;
2281     task->prev = NULL;
2282     if (cap->suspended_ccalling_tasks) {
2283         cap->suspended_ccalling_tasks->prev = task;
2284     }
2285     cap->suspended_ccalling_tasks = task;
2286 }
2287
2288 STATIC_INLINE void
2289 recoverSuspendedTask (Capability *cap, Task *task)
2290 {
2291     if (task->prev) {
2292         task->prev->next = task->next;
2293     } else {
2294         ASSERT(cap->suspended_ccalling_tasks == task);
2295         cap->suspended_ccalling_tasks = task->next;
2296     }
2297     if (task->next) {
2298         task->next->prev = task->prev;
2299     }
2300     task->next = task->prev = NULL;
2301 }
2302
2303 /* ---------------------------------------------------------------------------
2304  * Suspending & resuming Haskell threads.
2305  * 
2306  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2307  * its capability before calling the C function.  This allows another
2308  * task to pick up the capability and carry on running Haskell
2309  * threads.  It also means that if the C call blocks, it won't lock
2310  * the whole system.
2311  *
2312  * The Haskell thread making the C call is put to sleep for the
2313  * duration of the call, on the susepended_ccalling_threads queue.  We
2314  * give out a token to the task, which it can use to resume the thread
2315  * on return from the C function.
2316  * ------------------------------------------------------------------------- */
2317    
2318 void *
2319 suspendThread (StgRegTable *reg)
2320 {
2321   Capability *cap;
2322   int saved_errno;
2323   StgTSO *tso;
2324   Task *task;
2325 #if mingw32_HOST_OS
2326   StgWord32 saved_winerror;
2327 #endif
2328
2329   saved_errno = errno;
2330 #if mingw32_HOST_OS
2331   saved_winerror = GetLastError();
2332 #endif
2333
2334   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2335    */
2336   cap = regTableToCapability(reg);
2337
2338   task = cap->running_task;
2339   tso = cap->r.rCurrentTSO;
2340
2341   debugTrace(DEBUG_sched, 
2342              "thread %lu did a safe foreign call", 
2343              (unsigned long)cap->r.rCurrentTSO->id);
2344
2345   // XXX this might not be necessary --SDM
2346   tso->what_next = ThreadRunGHC;
2347
2348   threadPaused(cap,tso);
2349
2350   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2351       tso->why_blocked = BlockedOnCCall;
2352       tso->flags |= TSO_BLOCKEX;
2353       tso->flags &= ~TSO_INTERRUPTIBLE;
2354   } else {
2355       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2356   }
2357
2358   // Hand back capability
2359   task->suspended_tso = tso;
2360
2361   ACQUIRE_LOCK(&cap->lock);
2362
2363   suspendTask(cap,task);
2364   cap->in_haskell = rtsFalse;
2365   releaseCapability_(cap);
2366   
2367   RELEASE_LOCK(&cap->lock);
2368
2369 #if defined(THREADED_RTS)
2370   /* Preparing to leave the RTS, so ensure there's a native thread/task
2371      waiting to take over.
2372   */
2373   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2374 #endif
2375
2376   errno = saved_errno;
2377 #if mingw32_HOST_OS
2378   SetLastError(saved_winerror);
2379 #endif
2380   return task;
2381 }
2382
2383 StgRegTable *
2384 resumeThread (void *task_)
2385 {
2386     StgTSO *tso;
2387     Capability *cap;
2388     Task *task = task_;
2389     int saved_errno;
2390 #if mingw32_HOST_OS
2391     StgWord32 saved_winerror;
2392 #endif
2393
2394     saved_errno = errno;
2395 #if mingw32_HOST_OS
2396     saved_winerror = GetLastError();
2397 #endif
2398
2399     cap = task->cap;
2400     // Wait for permission to re-enter the RTS with the result.
2401     waitForReturnCapability(&cap,task);
2402     // we might be on a different capability now... but if so, our
2403     // entry on the suspended_ccalling_tasks list will also have been
2404     // migrated.
2405
2406     // Remove the thread from the suspended list
2407     recoverSuspendedTask(cap,task);
2408
2409     tso = task->suspended_tso;
2410     task->suspended_tso = NULL;
2411     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2412     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2413     
2414     if (tso->why_blocked == BlockedOnCCall) {
2415         awakenBlockedExceptionQueue(cap,tso);
2416         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2417     }
2418     
2419     /* Reset blocking status */
2420     tso->why_blocked  = NotBlocked;
2421     
2422     cap->r.rCurrentTSO = tso;
2423     cap->in_haskell = rtsTrue;
2424     errno = saved_errno;
2425 #if mingw32_HOST_OS
2426     SetLastError(saved_winerror);
2427 #endif
2428
2429     /* We might have GC'd, mark the TSO dirty again */
2430     dirty_TSO(cap,tso);
2431
2432     IF_DEBUG(sanity, checkTSO(tso));
2433
2434     return &cap->r;
2435 }
2436
2437 /* ---------------------------------------------------------------------------
2438  * scheduleThread()
2439  *
2440  * scheduleThread puts a thread on the end  of the runnable queue.
2441  * This will usually be done immediately after a thread is created.
2442  * The caller of scheduleThread must create the thread using e.g.
2443  * createThread and push an appropriate closure
2444  * on this thread's stack before the scheduler is invoked.
2445  * ------------------------------------------------------------------------ */
2446
2447 void
2448 scheduleThread(Capability *cap, StgTSO *tso)
2449 {
2450     // The thread goes at the *end* of the run-queue, to avoid possible
2451     // starvation of any threads already on the queue.
2452     appendToRunQueue(cap,tso);
2453 }
2454
2455 void
2456 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2457 {
2458 #if defined(THREADED_RTS)
2459     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2460                               // move this thread from now on.
2461     cpu %= RtsFlags.ParFlags.nNodes;
2462     if (cpu == cap->no) {
2463         appendToRunQueue(cap,tso);
2464     } else {
2465         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2466     }
2467 #else
2468     appendToRunQueue(cap,tso);
2469 #endif
2470 }
2471
2472 Capability *
2473 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2474 {
2475     Task *task;
2476
2477     // We already created/initialised the Task
2478     task = cap->running_task;
2479
2480     // This TSO is now a bound thread; make the Task and TSO
2481     // point to each other.
2482     tso->bound = task;
2483     tso->cap = cap;
2484
2485     task->tso = tso;
2486     task->ret = ret;
2487     task->stat = NoStatus;
2488
2489     appendToRunQueue(cap,tso);
2490
2491     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2492
2493 #if defined(GRAN)
2494     /* GranSim specific init */
2495     CurrentTSO = m->tso;                // the TSO to run
2496     procStatus[MainProc] = Busy;        // status of main PE
2497     CurrentProc = MainProc;             // PE to run it on
2498 #endif
2499
2500     cap = schedule(cap,task);
2501
2502     ASSERT(task->stat != NoStatus);
2503     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2504
2505     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2506     return cap;
2507 }
2508
2509 /* ----------------------------------------------------------------------------
2510  * Starting Tasks
2511  * ------------------------------------------------------------------------- */
2512
2513 #if defined(THREADED_RTS)
2514 void
2515 workerStart(Task *task)
2516 {
2517     Capability *cap;
2518
2519     // See startWorkerTask().
2520     ACQUIRE_LOCK(&task->lock);
2521     cap = task->cap;
2522     RELEASE_LOCK(&task->lock);
2523
2524     // set the thread-local pointer to the Task:
2525     taskEnter(task);
2526
2527     // schedule() runs without a lock.
2528     cap = schedule(cap,task);
2529
2530     // On exit from schedule(), we have a Capability.
2531     releaseCapability(cap);
2532     workerTaskStop(task);
2533 }
2534 #endif
2535
2536 /* ---------------------------------------------------------------------------
2537  * initScheduler()
2538  *
2539  * Initialise the scheduler.  This resets all the queues - if the
2540  * queues contained any threads, they'll be garbage collected at the
2541  * next pass.
2542  *
2543  * ------------------------------------------------------------------------ */
2544
2545 void 
2546 initScheduler(void)
2547 {
2548 #if defined(GRAN)
2549   nat i;
2550   for (i=0; i<=MAX_PROC; i++) {
2551     run_queue_hds[i]      = END_TSO_QUEUE;
2552     run_queue_tls[i]      = END_TSO_QUEUE;
2553     blocked_queue_hds[i]  = END_TSO_QUEUE;
2554     blocked_queue_tls[i]  = END_TSO_QUEUE;
2555     ccalling_threadss[i]  = END_TSO_QUEUE;
2556     blackhole_queue[i]    = END_TSO_QUEUE;
2557     sleeping_queue        = END_TSO_QUEUE;
2558   }
2559 #elif !defined(THREADED_RTS)
2560   blocked_queue_hd  = END_TSO_QUEUE;
2561   blocked_queue_tl  = END_TSO_QUEUE;
2562   sleeping_queue    = END_TSO_QUEUE;
2563 #endif
2564
2565   blackhole_queue   = END_TSO_QUEUE;
2566
2567   context_switch = 0;
2568   sched_state    = SCHED_RUNNING;
2569   recent_activity = ACTIVITY_YES;
2570
2571 #if defined(THREADED_RTS)
2572   /* Initialise the mutex and condition variables used by
2573    * the scheduler. */
2574   initMutex(&sched_mutex);
2575 #endif
2576   
2577   ACQUIRE_LOCK(&sched_mutex);
2578
2579   /* A capability holds the state a native thread needs in
2580    * order to execute STG code. At least one capability is
2581    * floating around (only THREADED_RTS builds have more than one).
2582    */
2583   initCapabilities();
2584
2585   initTaskManager();
2586
2587 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2588   initSparkPools();
2589 #endif
2590
2591 #if defined(THREADED_RTS)
2592   /*
2593    * Eagerly start one worker to run each Capability, except for
2594    * Capability 0.  The idea is that we're probably going to start a
2595    * bound thread on Capability 0 pretty soon, so we don't want a
2596    * worker task hogging it.
2597    */
2598   { 
2599       nat i;
2600       Capability *cap;
2601       for (i = 1; i < n_capabilities; i++) {
2602           cap = &capabilities[i];
2603           ACQUIRE_LOCK(&cap->lock);
2604           startWorkerTask(cap, workerStart);
2605           RELEASE_LOCK(&cap->lock);
2606       }
2607   }
2608 #endif
2609
2610   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2611
2612   RELEASE_LOCK(&sched_mutex);
2613 }
2614
2615 void
2616 exitScheduler(
2617     rtsBool wait_foreign
2618 #if !defined(THREADED_RTS)
2619                          __attribute__((unused))
2620 #endif
2621 )
2622                /* see Capability.c, shutdownCapability() */
2623 {
2624     Task *task = NULL;
2625
2626 #if defined(THREADED_RTS)
2627     ACQUIRE_LOCK(&sched_mutex);
2628     task = newBoundTask();
2629     RELEASE_LOCK(&sched_mutex);
2630 #endif
2631
2632     // If we haven't killed all the threads yet, do it now.
2633     if (sched_state < SCHED_SHUTTING_DOWN) {
2634         sched_state = SCHED_INTERRUPTING;
2635         scheduleDoGC(NULL,task,rtsFalse);    
2636     }
2637     sched_state = SCHED_SHUTTING_DOWN;
2638
2639 #if defined(THREADED_RTS)
2640     { 
2641         nat i;
2642         
2643         for (i = 0; i < n_capabilities; i++) {
2644             shutdownCapability(&capabilities[i], task, wait_foreign);
2645         }
2646         boundTaskExiting(task);
2647         stopTaskManager();
2648     }
2649 #else
2650     freeCapability(&MainCapability);
2651 #endif
2652 }
2653
2654 void
2655 freeScheduler( void )
2656 {
2657     freeTaskManager();
2658     if (n_capabilities != 1) {
2659         stgFree(capabilities);
2660     }
2661 #if defined(THREADED_RTS)
2662     closeMutex(&sched_mutex);
2663 #endif
2664 }
2665
2666 /* -----------------------------------------------------------------------------
2667    performGC
2668
2669    This is the interface to the garbage collector from Haskell land.
2670    We provide this so that external C code can allocate and garbage
2671    collect when called from Haskell via _ccall_GC.
2672    -------------------------------------------------------------------------- */
2673
2674 static void
2675 performGC_(rtsBool force_major)
2676 {
2677     Task *task;
2678     // We must grab a new Task here, because the existing Task may be
2679     // associated with a particular Capability, and chained onto the 
2680     // suspended_ccalling_tasks queue.
2681     ACQUIRE_LOCK(&sched_mutex);
2682     task = newBoundTask();
2683     RELEASE_LOCK(&sched_mutex);
2684     scheduleDoGC(NULL,task,force_major);
2685     boundTaskExiting(task);
2686 }
2687
2688 void
2689 performGC(void)
2690 {
2691     performGC_(rtsFalse);
2692 }
2693
2694 void
2695 performMajorGC(void)
2696 {
2697     performGC_(rtsTrue);
2698 }
2699
2700 /* -----------------------------------------------------------------------------
2701    Stack overflow
2702
2703    If the thread has reached its maximum stack size, then raise the
2704    StackOverflow exception in the offending thread.  Otherwise
2705    relocate the TSO into a larger chunk of memory and adjust its stack
2706    size appropriately.
2707    -------------------------------------------------------------------------- */
2708
2709 static StgTSO *
2710 threadStackOverflow(Capability *cap, StgTSO *tso)
2711 {
2712   nat new_stack_size, stack_words;
2713   lnat new_tso_size;
2714   StgPtr new_sp;
2715   StgTSO *dest;
2716
2717   IF_DEBUG(sanity,checkTSO(tso));
2718
2719   // don't allow throwTo() to modify the blocked_exceptions queue
2720   // while we are moving the TSO:
2721   lockClosure((StgClosure *)tso);
2722
2723   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2724       // NB. never raise a StackOverflow exception if the thread is
2725       // inside Control.Exceptino.block.  It is impractical to protect
2726       // against stack overflow exceptions, since virtually anything
2727       // can raise one (even 'catch'), so this is the only sensible
2728       // thing to do here.  See bug #767.
2729
2730       debugTrace(DEBUG_gc,
2731                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2732                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2733       IF_DEBUG(gc,
2734                /* If we're debugging, just print out the top of the stack */
2735                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2736                                                 tso->sp+64)));
2737
2738       // Send this thread the StackOverflow exception
2739       unlockTSO(tso);
2740       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2741       return tso;
2742   }
2743
2744   /* Try to double the current stack size.  If that takes us over the
2745    * maximum stack size for this thread, then use the maximum instead.
2746    * Finally round up so the TSO ends up as a whole number of blocks.
2747    */
2748   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2749   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2750                                        TSO_STRUCT_SIZE)/sizeof(W_);
2751   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2752   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2753
2754   debugTrace(DEBUG_sched, 
2755              "increasing stack size from %ld words to %d.",
2756              (long)tso->stack_size, new_stack_size);
2757
2758   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2759   TICK_ALLOC_TSO(new_stack_size,0);
2760
2761   /* copy the TSO block and the old stack into the new area */
2762   memcpy(dest,tso,TSO_STRUCT_SIZE);
2763   stack_words = tso->stack + tso->stack_size - tso->sp;
2764   new_sp = (P_)dest + new_tso_size - stack_words;
2765   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2766
2767   /* relocate the stack pointers... */
2768   dest->sp         = new_sp;
2769   dest->stack_size = new_stack_size;
2770         
2771   /* Mark the old TSO as relocated.  We have to check for relocated
2772    * TSOs in the garbage collector and any primops that deal with TSOs.
2773    *
2774    * It's important to set the sp value to just beyond the end
2775    * of the stack, so we don't attempt to scavenge any part of the
2776    * dead TSO's stack.
2777    */
2778   tso->what_next = ThreadRelocated;
2779   setTSOLink(cap,tso,dest);
2780   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2781   tso->why_blocked = NotBlocked;
2782
2783   IF_PAR_DEBUG(verbose,
2784                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2785                      tso->id, tso, tso->stack_size);
2786                /* If we're debugging, just print out the top of the stack */
2787                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2788                                                 tso->sp+64)));
2789   
2790   unlockTSO(dest);
2791   unlockTSO(tso);
2792
2793   IF_DEBUG(sanity,checkTSO(dest));
2794 #if 0
2795   IF_DEBUG(scheduler,printTSO(dest));
2796 #endif
2797
2798   return dest;
2799 }
2800
2801 static StgTSO *
2802 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2803 {
2804     bdescr *bd, *new_bd;
2805     lnat new_tso_size_w, tso_size_w;
2806     StgTSO *new_tso;
2807
2808     tso_size_w = tso_sizeW(tso);
2809
2810     if (tso_size_w < MBLOCK_SIZE_W || 
2811         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2812     {
2813         return tso;
2814     }
2815
2816     // don't allow throwTo() to modify the blocked_exceptions queue
2817     // while we are moving the TSO:
2818     lockClosure((StgClosure *)tso);
2819
2820     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2821
2822     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2823                tso->id, tso_size_w, new_tso_size_w);
2824
2825     bd = Bdescr((StgPtr)tso);
2826     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2827     new_bd->free = bd->free;
2828     bd->free = bd->start + TSO_STRUCT_SIZEW;
2829
2830     new_tso = (StgTSO *)new_bd->start;
2831     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2832     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2833
2834     tso->what_next = ThreadRelocated;
2835     tso->_link = new_tso; // no write barrier reqd: same generation
2836
2837     // The TSO attached to this Task may have moved, so update the
2838     // pointer to it.
2839     if (task->tso == tso) {
2840         task->tso = new_tso;
2841     }
2842
2843     unlockTSO(new_tso);
2844     unlockTSO(tso);
2845
2846     IF_DEBUG(sanity,checkTSO(new_tso));
2847
2848     return new_tso;
2849 }
2850
2851 /* ---------------------------------------------------------------------------
2852    Interrupt execution
2853    - usually called inside a signal handler so it mustn't do anything fancy.   
2854    ------------------------------------------------------------------------ */
2855
2856 void
2857 interruptStgRts(void)
2858 {
2859     sched_state = SCHED_INTERRUPTING;
2860     context_switch = 1;
2861     wakeUpRts();
2862 }
2863
2864 /* -----------------------------------------------------------------------------
2865    Wake up the RTS
2866    
2867    This function causes at least one OS thread to wake up and run the
2868    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2869    an external event has arrived that may need servicing (eg. a
2870    keyboard interrupt).
2871
2872    In the single-threaded RTS we don't do anything here; we only have
2873    one thread anyway, and the event that caused us to want to wake up
2874    will have interrupted any blocking system call in progress anyway.
2875    -------------------------------------------------------------------------- */
2876
2877 void
2878 wakeUpRts(void)
2879 {
2880 #if defined(THREADED_RTS)
2881     // This forces the IO Manager thread to wakeup, which will
2882     // in turn ensure that some OS thread wakes up and runs the
2883     // scheduler loop, which will cause a GC and deadlock check.
2884     ioManagerWakeup();
2885 #endif
2886 }
2887
2888 /* -----------------------------------------------------------------------------
2889  * checkBlackHoles()
2890  *
2891  * Check the blackhole_queue for threads that can be woken up.  We do
2892  * this periodically: before every GC, and whenever the run queue is
2893  * empty.
2894  *
2895  * An elegant solution might be to just wake up all the blocked
2896  * threads with awakenBlockedQueue occasionally: they'll go back to
2897  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2898  * doesn't give us a way to tell whether we've actually managed to
2899  * wake up any threads, so we would be busy-waiting.
2900  *
2901  * -------------------------------------------------------------------------- */
2902
2903 static rtsBool
2904 checkBlackHoles (Capability *cap)
2905 {
2906     StgTSO **prev, *t;
2907     rtsBool any_woke_up = rtsFalse;
2908     StgHalfWord type;
2909
2910     // blackhole_queue is global:
2911     ASSERT_LOCK_HELD(&sched_mutex);
2912
2913     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2914
2915     // ASSUMES: sched_mutex
2916     prev = &blackhole_queue;
2917     t = blackhole_queue;
2918     while (t != END_TSO_QUEUE) {
2919         ASSERT(t->why_blocked == BlockedOnBlackHole);
2920         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2921         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2922             IF_DEBUG(sanity,checkTSO(t));
2923             t = unblockOne(cap, t);
2924             // urk, the threads migrate to the current capability
2925             // here, but we'd like to keep them on the original one.
2926             *prev = t;
2927             any_woke_up = rtsTrue;
2928         } else {
2929             prev = &t->_link;
2930             t = t->_link;
2931         }
2932     }
2933
2934     return any_woke_up;
2935 }
2936
2937 /* -----------------------------------------------------------------------------
2938    Deleting threads
2939
2940    This is used for interruption (^C) and forking, and corresponds to
2941    raising an exception but without letting the thread catch the
2942    exception.
2943    -------------------------------------------------------------------------- */
2944
2945 static void 
2946 deleteThread (Capability *cap, StgTSO *tso)
2947 {
2948     // NOTE: must only be called on a TSO that we have exclusive
2949     // access to, because we will call throwToSingleThreaded() below.
2950     // The TSO must be on the run queue of the Capability we own, or 
2951     // we must own all Capabilities.
2952
2953     if (tso->why_blocked != BlockedOnCCall &&
2954         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2955         throwToSingleThreaded(cap,tso,NULL);
2956     }
2957 }
2958
2959 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2960 static void 
2961 deleteThread_(Capability *cap, StgTSO *tso)
2962 { // for forkProcess only:
2963   // like deleteThread(), but we delete threads in foreign calls, too.
2964
2965     if (tso->why_blocked == BlockedOnCCall ||
2966         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2967         unblockOne(cap,tso);
2968         tso->what_next = ThreadKilled;
2969     } else {
2970         deleteThread(cap,tso);
2971     }
2972 }
2973 #endif
2974
2975 /* -----------------------------------------------------------------------------
2976    raiseExceptionHelper
2977    
2978    This function is called by the raise# primitve, just so that we can
2979    move some of the tricky bits of raising an exception from C-- into
2980    C.  Who knows, it might be a useful re-useable thing here too.
2981    -------------------------------------------------------------------------- */
2982
2983 StgWord
2984 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2985 {
2986     Capability *cap = regTableToCapability(reg);
2987     StgThunk *raise_closure = NULL;
2988     StgPtr p, next;
2989     StgRetInfoTable *info;
2990     //
2991     // This closure represents the expression 'raise# E' where E
2992     // is the exception raise.  It is used to overwrite all the
2993     // thunks which are currently under evaluataion.
2994     //
2995
2996     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2997     // LDV profiling: stg_raise_info has THUNK as its closure
2998     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2999     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3000     // 1 does not cause any problem unless profiling is performed.
3001     // However, when LDV profiling goes on, we need to linearly scan
3002     // small object pool, where raise_closure is stored, so we should
3003     // use MIN_UPD_SIZE.
3004     //
3005     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3006     //                                 sizeofW(StgClosure)+1);
3007     //
3008
3009     //
3010     // Walk up the stack, looking for the catch frame.  On the way,
3011     // we update any closures pointed to from update frames with the
3012     // raise closure that we just built.
3013     //
3014     p = tso->sp;
3015     while(1) {
3016         info = get_ret_itbl((StgClosure *)p);
3017         next = p + stack_frame_sizeW((StgClosure *)p);
3018         switch (info->i.type) {
3019             
3020         case UPDATE_FRAME:
3021             // Only create raise_closure if we need to.
3022             if (raise_closure == NULL) {
3023                 raise_closure = 
3024                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3025                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3026                 raise_closure->payload[0] = exception;
3027             }
3028             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3029             p = next;
3030             continue;
3031
3032         case ATOMICALLY_FRAME:
3033             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3034             tso->sp = p;
3035             return ATOMICALLY_FRAME;
3036             
3037         case CATCH_FRAME:
3038             tso->sp = p;
3039             return CATCH_FRAME;
3040
3041         case CATCH_STM_FRAME:
3042             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3043             tso->sp = p;
3044             return CATCH_STM_FRAME;
3045             
3046         case STOP_FRAME:
3047             tso->sp = p;
3048             return STOP_FRAME;
3049
3050         case CATCH_RETRY_FRAME:
3051         default:
3052             p = next; 
3053             continue;
3054         }
3055     }
3056 }
3057
3058
3059 /* -----------------------------------------------------------------------------
3060    findRetryFrameHelper
3061
3062    This function is called by the retry# primitive.  It traverses the stack
3063    leaving tso->sp referring to the frame which should handle the retry.  
3064
3065    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3066    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3067
3068    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3069    create) because retries are not considered to be exceptions, despite the
3070    similar implementation.
3071
3072    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3073    not be created within memory transactions.
3074    -------------------------------------------------------------------------- */
3075
3076 StgWord
3077 findRetryFrameHelper (StgTSO *tso)
3078 {
3079   StgPtr           p, next;
3080   StgRetInfoTable *info;
3081
3082   p = tso -> sp;
3083   while (1) {
3084     info = get_ret_itbl((StgClosure *)p);
3085     next = p + stack_frame_sizeW((StgClosure *)p);
3086     switch (info->i.type) {
3087       
3088     case ATOMICALLY_FRAME:
3089         debugTrace(DEBUG_stm,
3090                    "found ATOMICALLY_FRAME at %p during retry", p);
3091         tso->sp = p;
3092         return ATOMICALLY_FRAME;
3093       
3094     case CATCH_RETRY_FRAME:
3095         debugTrace(DEBUG_stm,
3096                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3097         tso->sp = p;
3098         return CATCH_RETRY_FRAME;
3099       
3100     case CATCH_STM_FRAME: {
3101         StgTRecHeader *trec = tso -> trec;
3102         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3103         debugTrace(DEBUG_stm,
3104                    "found CATCH_STM_FRAME at %p during retry", p);
3105         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3106         stmAbortTransaction(tso -> cap, trec);
3107         stmFreeAbortedTRec(tso -> cap, trec);
3108         tso -> trec = outer;
3109         p = next; 
3110         continue;
3111     }
3112       
3113
3114     default:
3115       ASSERT(info->i.type != CATCH_FRAME);
3116       ASSERT(info->i.type != STOP_FRAME);
3117       p = next; 
3118       continue;
3119     }
3120   }
3121 }
3122
3123 /* -----------------------------------------------------------------------------
3124    resurrectThreads is called after garbage collection on the list of
3125    threads found to be garbage.  Each of these threads will be woken
3126    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3127    on an MVar, or NonTermination if the thread was blocked on a Black
3128    Hole.
3129
3130    Locks: assumes we hold *all* the capabilities.
3131    -------------------------------------------------------------------------- */
3132
3133 void
3134 resurrectThreads (StgTSO *threads)
3135 {
3136     StgTSO *tso, *next;
3137     Capability *cap;
3138     step *step;
3139
3140     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3141         next = tso->global_link;
3142
3143         step = Bdescr((P_)tso)->step;
3144         tso->global_link = step->threads;
3145         step->threads = tso;
3146
3147         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3148         
3149         // Wake up the thread on the Capability it was last on
3150         cap = tso->cap;
3151         
3152         switch (tso->why_blocked) {
3153         case BlockedOnMVar:
3154         case BlockedOnException:
3155             /* Called by GC - sched_mutex lock is currently held. */
3156             throwToSingleThreaded(cap, tso,
3157                                   (StgClosure *)BlockedOnDeadMVar_closure);
3158             break;
3159         case BlockedOnBlackHole:
3160             throwToSingleThreaded(cap, tso,
3161                                   (StgClosure *)NonTermination_closure);
3162             break;
3163         case BlockedOnSTM:
3164             throwToSingleThreaded(cap, tso,
3165                                   (StgClosure *)BlockedIndefinitely_closure);
3166             break;
3167         case NotBlocked:
3168             /* This might happen if the thread was blocked on a black hole
3169              * belonging to a thread that we've just woken up (raiseAsync
3170              * can wake up threads, remember...).
3171              */
3172             continue;
3173         default:
3174             barf("resurrectThreads: thread blocked in a strange way");
3175         }
3176     }
3177 }
3178
3179 /* -----------------------------------------------------------------------------
3180    performPendingThrowTos is called after garbage collection, and
3181    passed a list of threads that were found to have pending throwTos
3182    (tso->blocked_exceptions was not empty), and were blocked.
3183    Normally this doesn't happen, because we would deliver the
3184    exception directly if the target thread is blocked, but there are
3185    small windows where it might occur on a multiprocessor (see
3186    throwTo()).
3187
3188    NB. we must be holding all the capabilities at this point, just
3189    like resurrectThreads().
3190    -------------------------------------------------------------------------- */
3191
3192 void
3193 performPendingThrowTos (StgTSO *threads)
3194 {
3195     StgTSO *tso, *next;
3196     Capability *cap;
3197     step *step;
3198
3199     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3200         next = tso->global_link;
3201
3202         step = Bdescr((P_)tso)->step;
3203         tso->global_link = step->threads;
3204         step->threads = tso;
3205
3206         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
3207         
3208         cap = tso->cap;
3209         maybePerformBlockedException(cap, tso);
3210     }
3211 }