FIX #767 (withMVar family have a bug)
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #include "Proftimer.h"
32 #include "ProfHeap.h"
33 #if defined(GRAN) || defined(PARALLEL_HASKELL)
34 # include "GranSimRts.h"
35 # include "GranSim.h"
36 # include "ParallelRts.h"
37 # include "Parallel.h"
38 # include "ParallelDebug.h"
39 # include "FetchMe.h"
40 # include "HLC.h"
41 #endif
42 #include "Sparks.h"
43 #include "Capability.h"
44 #include "Task.h"
45 #include "AwaitEvent.h"
46 #if defined(mingw32_HOST_OS)
47 #include "win32/IOManager.h"
48 #endif
49 #include "Trace.h"
50 #include "RaiseAsync.h"
51 #include "Threads.h"
52 #include "ThrIOManager.h"
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 #if defined(GRAN)
143 StgTSO *CurrentTSO;
144 #endif
145
146 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
147  *  exists - earlier gccs apparently didn't.
148  *  -= chak
149  */
150 StgTSO dummy_tso;
151
152 /*
153  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
154  * in an MT setting, needed to signal that a worker thread shouldn't hang around
155  * in the scheduler when it is out of work.
156  */
157 rtsBool shutting_down_scheduler = rtsFalse;
158
159 /*
160  * This mutex protects most of the global scheduler data in
161  * the THREADED_RTS runtime.
162  */
163 #if defined(THREADED_RTS)
164 Mutex sched_mutex;
165 #endif
166
167 #if defined(PARALLEL_HASKELL)
168 StgTSO *LastTSO;
169 rtsTime TimeOfLastYield;
170 rtsBool emitSchedule = rtsTrue;
171 #endif
172
173 #if !defined(mingw32_HOST_OS)
174 #define FORKPROCESS_PRIMOP_SUPPORTED
175 #endif
176
177 /* -----------------------------------------------------------------------------
178  * static function prototypes
179  * -------------------------------------------------------------------------- */
180
181 static Capability *schedule (Capability *initialCapability, Task *task);
182
183 //
184 // These function all encapsulate parts of the scheduler loop, and are
185 // abstracted only to make the structure and control flow of the
186 // scheduler clearer.
187 //
188 static void schedulePreLoop (void);
189 #if defined(THREADED_RTS)
190 static void schedulePushWork(Capability *cap, Task *task);
191 #endif
192 static void scheduleStartSignalHandlers (Capability *cap);
193 static void scheduleCheckBlockedThreads (Capability *cap);
194 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
195 static void scheduleCheckBlackHoles (Capability *cap);
196 static void scheduleDetectDeadlock (Capability *cap, Task *task);
197 #if defined(GRAN)
198 static StgTSO *scheduleProcessEvent(rtsEvent *event);
199 #endif
200 #if defined(PARALLEL_HASKELL)
201 static StgTSO *scheduleSendPendingMessages(void);
202 static void scheduleActivateSpark(void);
203 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
204 #endif
205 #if defined(PAR) || defined(GRAN)
206 static void scheduleGranParReport(void);
207 #endif
208 static void schedulePostRunThread(void);
209 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
210 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
211                                          StgTSO *t);
212 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
213                                     nat prev_what_next );
214 static void scheduleHandleThreadBlocked( StgTSO *t );
215 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
216                                              StgTSO *t );
217 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
218 static Capability *scheduleDoGC(Capability *cap, Task *task,
219                                 rtsBool force_major);
220
221 static rtsBool checkBlackHoles(Capability *cap);
222
223 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
224
225 static void deleteThread (Capability *cap, StgTSO *tso);
226 static void deleteAllThreads (Capability *cap);
227
228 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
229 static void deleteThread_(Capability *cap, StgTSO *tso);
230 #endif
231
232 #if defined(PARALLEL_HASKELL)
233 StgTSO * createSparkThread(rtsSpark spark);
234 StgTSO * activateSpark (rtsSpark spark);  
235 #endif
236
237 #ifdef DEBUG
238 static char *whatNext_strs[] = {
239   "(unknown)",
240   "ThreadRunGHC",
241   "ThreadInterpret",
242   "ThreadKilled",
243   "ThreadRelocated",
244   "ThreadComplete"
245 };
246 #endif
247
248 /* -----------------------------------------------------------------------------
249  * Putting a thread on the run queue: different scheduling policies
250  * -------------------------------------------------------------------------- */
251
252 STATIC_INLINE void
253 addToRunQueue( Capability *cap, StgTSO *t )
254 {
255 #if defined(PARALLEL_HASKELL)
256     if (RtsFlags.ParFlags.doFairScheduling) { 
257         // this does round-robin scheduling; good for concurrency
258         appendToRunQueue(cap,t);
259     } else {
260         // this does unfair scheduling; good for parallelism
261         pushOnRunQueue(cap,t);
262     }
263 #else
264     // this does round-robin scheduling; good for concurrency
265     appendToRunQueue(cap,t);
266 #endif
267 }
268
269 /* ---------------------------------------------------------------------------
270    Main scheduling loop.
271
272    We use round-robin scheduling, each thread returning to the
273    scheduler loop when one of these conditions is detected:
274
275       * out of heap space
276       * timer expires (thread yields)
277       * thread blocks
278       * thread ends
279       * stack overflow
280
281    GRAN version:
282      In a GranSim setup this loop iterates over the global event queue.
283      This revolves around the global event queue, which determines what 
284      to do next. Therefore, it's more complicated than either the 
285      concurrent or the parallel (GUM) setup.
286
287    GUM version:
288      GUM iterates over incoming messages.
289      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
290      and sends out a fish whenever it has nothing to do; in-between
291      doing the actual reductions (shared code below) it processes the
292      incoming messages and deals with delayed operations 
293      (see PendingFetches).
294      This is not the ugliest code you could imagine, but it's bloody close.
295
296    ------------------------------------------------------------------------ */
297
298 static Capability *
299 schedule (Capability *initialCapability, Task *task)
300 {
301   StgTSO *t;
302   Capability *cap;
303   StgThreadReturnCode ret;
304 #if defined(GRAN)
305   rtsEvent *event;
306 #elif defined(PARALLEL_HASKELL)
307   StgTSO *tso;
308   GlobalTaskId pe;
309   rtsBool receivedFinish = rtsFalse;
310 # if defined(DEBUG)
311   nat tp_size, sp_size; // stats only
312 # endif
313 #endif
314   nat prev_what_next;
315   rtsBool ready_to_gc;
316 #if defined(THREADED_RTS)
317   rtsBool first = rtsTrue;
318 #endif
319   
320   cap = initialCapability;
321
322   // Pre-condition: this task owns initialCapability.
323   // The sched_mutex is *NOT* held
324   // NB. on return, we still hold a capability.
325
326   debugTrace (DEBUG_sched, 
327               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
328               task, initialCapability);
329
330   schedulePreLoop();
331
332   // -----------------------------------------------------------
333   // Scheduler loop starts here:
334
335 #if defined(PARALLEL_HASKELL)
336 #define TERMINATION_CONDITION        (!receivedFinish)
337 #elif defined(GRAN)
338 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
339 #else
340 #define TERMINATION_CONDITION        rtsTrue
341 #endif
342
343   while (TERMINATION_CONDITION) {
344
345 #if defined(GRAN)
346       /* Choose the processor with the next event */
347       CurrentProc = event->proc;
348       CurrentTSO = event->tso;
349 #endif
350
351 #if defined(THREADED_RTS)
352       if (first) {
353           // don't yield the first time, we want a chance to run this
354           // thread for a bit, even if there are others banging at the
355           // door.
356           first = rtsFalse;
357           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
358       } else {
359           // Yield the capability to higher-priority tasks if necessary.
360           yieldCapability(&cap, task);
361       }
362 #endif
363       
364 #if defined(THREADED_RTS)
365       schedulePushWork(cap,task);
366 #endif
367
368     // Check whether we have re-entered the RTS from Haskell without
369     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
370     // call).
371     if (cap->in_haskell) {
372           errorBelch("schedule: re-entered unsafely.\n"
373                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
374           stg_exit(EXIT_FAILURE);
375     }
376
377     // The interruption / shutdown sequence.
378     // 
379     // In order to cleanly shut down the runtime, we want to:
380     //   * make sure that all main threads return to their callers
381     //     with the state 'Interrupted'.
382     //   * clean up all OS threads assocated with the runtime
383     //   * free all memory etc.
384     //
385     // So the sequence for ^C goes like this:
386     //
387     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
388     //     arranges for some Capability to wake up
389     //
390     //   * all threads in the system are halted, and the zombies are
391     //     placed on the run queue for cleaning up.  We acquire all
392     //     the capabilities in order to delete the threads, this is
393     //     done by scheduleDoGC() for convenience (because GC already
394     //     needs to acquire all the capabilities).  We can't kill
395     //     threads involved in foreign calls.
396     // 
397     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
398     //
399     //   * sched_state := SCHED_SHUTTING_DOWN
400     //
401     //   * all workers exit when the run queue on their capability
402     //     drains.  All main threads will also exit when their TSO
403     //     reaches the head of the run queue and they can return.
404     //
405     //   * eventually all Capabilities will shut down, and the RTS can
406     //     exit.
407     //
408     //   * We might be left with threads blocked in foreign calls, 
409     //     we should really attempt to kill these somehow (TODO);
410     
411     switch (sched_state) {
412     case SCHED_RUNNING:
413         break;
414     case SCHED_INTERRUPTING:
415         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
416 #if defined(THREADED_RTS)
417         discardSparksCap(cap);
418 #endif
419         /* scheduleDoGC() deletes all the threads */
420         cap = scheduleDoGC(cap,task,rtsFalse);
421         break;
422     case SCHED_SHUTTING_DOWN:
423         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
424         // If we are a worker, just exit.  If we're a bound thread
425         // then we will exit below when we've removed our TSO from
426         // the run queue.
427         if (task->tso == NULL && emptyRunQueue(cap)) {
428             return cap;
429         }
430         break;
431     default:
432         barf("sched_state: %d", sched_state);
433     }
434
435 #if defined(THREADED_RTS)
436     // If the run queue is empty, take a spark and turn it into a thread.
437     {
438         if (emptyRunQueue(cap)) {
439             StgClosure *spark;
440             spark = findSpark(cap);
441             if (spark != NULL) {
442                 debugTrace(DEBUG_sched,
443                            "turning spark of closure %p into a thread",
444                            (StgClosure *)spark);
445                 createSparkThread(cap,spark);     
446             }
447         }
448     }
449 #endif // THREADED_RTS
450
451     scheduleStartSignalHandlers(cap);
452
453     // Only check the black holes here if we've nothing else to do.
454     // During normal execution, the black hole list only gets checked
455     // at GC time, to avoid repeatedly traversing this possibly long
456     // list each time around the scheduler.
457     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
458
459     scheduleCheckWakeupThreads(cap);
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464 #if defined(THREADED_RTS)
465     cap = task->cap;    // reload cap, it might have changed
466 #endif
467
468     // Normally, the only way we can get here with no threads to
469     // run is if a keyboard interrupt received during 
470     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
471     // Additionally, it is not fatal for the
472     // threaded RTS to reach here with no threads to run.
473     //
474     // win32: might be here due to awaitEvent() being abandoned
475     // as a result of a console event having been delivered.
476     if ( emptyRunQueue(cap) ) {
477 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
478         ASSERT(sched_state >= SCHED_INTERRUPTING);
479 #endif
480         continue; // nothing to do
481     }
482
483 #if defined(PARALLEL_HASKELL)
484     scheduleSendPendingMessages();
485     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
486         continue;
487
488 #if defined(SPARKS)
489     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
490 #endif
491
492     /* If we still have no work we need to send a FISH to get a spark
493        from another PE */
494     if (emptyRunQueue(cap)) {
495         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
496         ASSERT(rtsFalse); // should not happen at the moment
497     }
498     // from here: non-empty run queue.
499     //  TODO: merge above case with this, only one call processMessages() !
500     if (PacketsWaiting()) {  /* process incoming messages, if
501                                 any pending...  only in else
502                                 because getRemoteWork waits for
503                                 messages as well */
504         receivedFinish = processMessages();
505     }
506 #endif
507
508 #if defined(GRAN)
509     scheduleProcessEvent(event);
510 #endif
511
512     // 
513     // Get a thread to run
514     //
515     t = popRunQueue(cap);
516
517 #if defined(GRAN) || defined(PAR)
518     scheduleGranParReport(); // some kind of debuging output
519 #else
520     // Sanity check the thread we're about to run.  This can be
521     // expensive if there is lots of thread switching going on...
522     IF_DEBUG(sanity,checkTSO(t));
523 #endif
524
525 #if defined(THREADED_RTS)
526     // Check whether we can run this thread in the current task.
527     // If not, we have to pass our capability to the right task.
528     {
529         Task *bound = t->bound;
530       
531         if (bound) {
532             if (bound == task) {
533                 debugTrace(DEBUG_sched,
534                            "### Running thread %lu in bound thread", (unsigned long)t->id);
535                 // yes, the Haskell thread is bound to the current native thread
536             } else {
537                 debugTrace(DEBUG_sched,
538                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
539                 // no, bound to a different Haskell thread: pass to that thread
540                 pushOnRunQueue(cap,t);
541                 continue;
542             }
543         } else {
544             // The thread we want to run is unbound.
545             if (task->tso) { 
546                 debugTrace(DEBUG_sched,
547                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
548                 // no, the current native thread is bound to a different
549                 // Haskell thread, so pass it to any worker thread
550                 pushOnRunQueue(cap,t);
551                 continue; 
552             }
553         }
554     }
555 #endif
556
557     cap->r.rCurrentTSO = t;
558     
559     /* context switches are initiated by the timer signal, unless
560      * the user specified "context switch as often as possible", with
561      * +RTS -C0
562      */
563     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
564         && !emptyThreadQueues(cap)) {
565         context_switch = 1;
566     }
567          
568 run_thread:
569
570     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
571                               (long)t->id, whatNext_strs[t->what_next]);
572
573     startHeapProfTimer();
574
575     // Check for exceptions blocked on this thread
576     maybePerformBlockedException (cap, t);
577
578     // ----------------------------------------------------------------------
579     // Run the current thread 
580
581     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
582     ASSERT(t->cap == cap);
583
584     prev_what_next = t->what_next;
585
586     errno = t->saved_errno;
587 #if mingw32_HOST_OS
588     SetLastError(t->saved_winerror);
589 #endif
590
591     cap->in_haskell = rtsTrue;
592
593     dirtyTSO(t);
594
595     recent_activity = ACTIVITY_YES;
596
597     switch (prev_what_next) {
598         
599     case ThreadKilled:
600     case ThreadComplete:
601         /* Thread already finished, return to scheduler. */
602         ret = ThreadFinished;
603         break;
604         
605     case ThreadRunGHC:
606     {
607         StgRegTable *r;
608         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609         cap = regTableToCapability(r);
610         ret = r->rRet;
611         break;
612     }
613     
614     case ThreadInterpret:
615         cap = interpretBCO(cap);
616         ret = cap->r.rRet;
617         break;
618         
619     default:
620         barf("schedule: invalid what_next field");
621     }
622
623     cap->in_haskell = rtsFalse;
624
625     // The TSO might have moved, eg. if it re-entered the RTS and a GC
626     // happened.  So find the new location:
627     t = cap->r.rCurrentTSO;
628
629     // We have run some Haskell code: there might be blackhole-blocked
630     // threads to wake up now.
631     // Lock-free test here should be ok, we're just setting a flag.
632     if ( blackhole_queue != END_TSO_QUEUE ) {
633         blackholes_need_checking = rtsTrue;
634     }
635     
636     // And save the current errno in this thread.
637     // XXX: possibly bogus for SMP because this thread might already
638     // be running again, see code below.
639     t->saved_errno = errno;
640 #if mingw32_HOST_OS
641     // Similarly for Windows error code
642     t->saved_winerror = GetLastError();
643 #endif
644
645 #if defined(THREADED_RTS)
646     // If ret is ThreadBlocked, and this Task is bound to the TSO that
647     // blocked, we are in limbo - the TSO is now owned by whatever it
648     // is blocked on, and may in fact already have been woken up,
649     // perhaps even on a different Capability.  It may be the case
650     // that task->cap != cap.  We better yield this Capability
651     // immediately and return to normaility.
652     if (ret == ThreadBlocked) {
653         debugTrace(DEBUG_sched,
654                    "--<< thread %lu (%s) stopped: blocked",
655                    (unsigned long)t->id, whatNext_strs[t->what_next]);
656         continue;
657     }
658 #endif
659
660     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
661     ASSERT(t->cap == cap);
662
663     // ----------------------------------------------------------------------
664     
665     // Costs for the scheduler are assigned to CCS_SYSTEM
666     stopHeapProfTimer();
667 #if defined(PROFILING)
668     CCCS = CCS_SYSTEM;
669 #endif
670     
671     schedulePostRunThread();
672
673     ready_to_gc = rtsFalse;
674
675     switch (ret) {
676     case HeapOverflow:
677         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
678         break;
679
680     case StackOverflow:
681         scheduleHandleStackOverflow(cap,task,t);
682         break;
683
684     case ThreadYielding:
685         if (scheduleHandleYield(cap, t, prev_what_next)) {
686             // shortcut for switching between compiler/interpreter:
687             goto run_thread; 
688         }
689         break;
690
691     case ThreadBlocked:
692         scheduleHandleThreadBlocked(t);
693         break;
694
695     case ThreadFinished:
696         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
697         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
698         break;
699
700     default:
701       barf("schedule: invalid thread return code %d", (int)ret);
702     }
703
704     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
705       cap = scheduleDoGC(cap,task,rtsFalse);
706     }
707   } /* end of while() */
708
709   debugTrace(PAR_DEBUG_verbose,
710              "== Leaving schedule() after having received Finish");
711 }
712
713 /* ----------------------------------------------------------------------------
714  * Setting up the scheduler loop
715  * ------------------------------------------------------------------------- */
716
717 static void
718 schedulePreLoop(void)
719 {
720 #if defined(GRAN) 
721     /* set up first event to get things going */
722     /* ToDo: assign costs for system setup and init MainTSO ! */
723     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
724               ContinueThread, 
725               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
726     
727     debugTrace (DEBUG_gran,
728                 "GRAN: Init CurrentTSO (in schedule) = %p", 
729                 CurrentTSO);
730     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
731     
732     if (RtsFlags.GranFlags.Light) {
733         /* Save current time; GranSim Light only */
734         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
735     }      
736 #endif
737 }
738
739 /* -----------------------------------------------------------------------------
740  * schedulePushWork()
741  *
742  * Push work to other Capabilities if we have some.
743  * -------------------------------------------------------------------------- */
744
745 #if defined(THREADED_RTS)
746 static void
747 schedulePushWork(Capability *cap USED_IF_THREADS, 
748                  Task *task      USED_IF_THREADS)
749 {
750     Capability *free_caps[n_capabilities], *cap0;
751     nat i, n_free_caps;
752
753     // migration can be turned off with +RTS -qg
754     if (!RtsFlags.ParFlags.migrate) return;
755
756     // Check whether we have more threads on our run queue, or sparks
757     // in our pool, that we could hand to another Capability.
758     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
759         && sparkPoolSizeCap(cap) < 2) {
760         return;
761     }
762
763     // First grab as many free Capabilities as we can.
764     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
765         cap0 = &capabilities[i];
766         if (cap != cap0 && tryGrabCapability(cap0,task)) {
767             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
768                 // it already has some work, we just grabbed it at 
769                 // the wrong moment.  Or maybe it's deadlocked!
770                 releaseCapability(cap0);
771             } else {
772                 free_caps[n_free_caps++] = cap0;
773             }
774         }
775     }
776
777     // we now have n_free_caps free capabilities stashed in
778     // free_caps[].  Share our run queue equally with them.  This is
779     // probably the simplest thing we could do; improvements we might
780     // want to do include:
781     //
782     //   - giving high priority to moving relatively new threads, on 
783     //     the gournds that they haven't had time to build up a
784     //     working set in the cache on this CPU/Capability.
785     //
786     //   - giving low priority to moving long-lived threads
787
788     if (n_free_caps > 0) {
789         StgTSO *prev, *t, *next;
790         rtsBool pushed_to_all;
791
792         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
793
794         i = 0;
795         pushed_to_all = rtsFalse;
796
797         if (cap->run_queue_hd != END_TSO_QUEUE) {
798             prev = cap->run_queue_hd;
799             t = prev->link;
800             prev->link = END_TSO_QUEUE;
801             for (; t != END_TSO_QUEUE; t = next) {
802                 next = t->link;
803                 t->link = END_TSO_QUEUE;
804                 if (t->what_next == ThreadRelocated
805                     || t->bound == task // don't move my bound thread
806                     || tsoLocked(t)) {  // don't move a locked thread
807                     prev->link = t;
808                     prev = t;
809                 } else if (i == n_free_caps) {
810                     pushed_to_all = rtsTrue;
811                     i = 0;
812                     // keep one for us
813                     prev->link = t;
814                     prev = t;
815                 } else {
816                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
817                     appendToRunQueue(free_caps[i],t);
818                     if (t->bound) { t->bound->cap = free_caps[i]; }
819                     t->cap = free_caps[i];
820                     i++;
821                 }
822             }
823             cap->run_queue_tl = prev;
824         }
825
826         // If there are some free capabilities that we didn't push any
827         // threads to, then try to push a spark to each one.
828         if (!pushed_to_all) {
829             StgClosure *spark;
830             // i is the next free capability to push to
831             for (; i < n_free_caps; i++) {
832                 if (emptySparkPoolCap(free_caps[i])) {
833                     spark = findSpark(cap);
834                     if (spark != NULL) {
835                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
836                         newSpark(&(free_caps[i]->r), spark);
837                     }
838                 }
839             }
840         }
841
842         // release the capabilities
843         for (i = 0; i < n_free_caps; i++) {
844             task->cap = free_caps[i];
845             releaseCapability(free_caps[i]);
846         }
847     }
848     task->cap = cap; // reset to point to our Capability.
849 }
850 #endif
851
852 /* ----------------------------------------------------------------------------
853  * Start any pending signal handlers
854  * ------------------------------------------------------------------------- */
855
856 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
857 static void
858 scheduleStartSignalHandlers(Capability *cap)
859 {
860     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
861         // safe outside the lock
862         startSignalHandlers(cap);
863     }
864 }
865 #else
866 static void
867 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
868 {
869 }
870 #endif
871
872 /* ----------------------------------------------------------------------------
873  * Check for blocked threads that can be woken up.
874  * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
878 {
879 #if !defined(THREADED_RTS)
880     //
881     // Check whether any waiting threads need to be woken up.  If the
882     // run queue is empty, and there are no other tasks running, we
883     // can wait indefinitely for something to happen.
884     //
885     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
886     {
887         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
888     }
889 #endif
890 }
891
892
893 /* ----------------------------------------------------------------------------
894  * Check for threads woken up by other Capabilities
895  * ------------------------------------------------------------------------- */
896
897 static void
898 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
899 {
900 #if defined(THREADED_RTS)
901     // Any threads that were woken up by other Capabilities get
902     // appended to our run queue.
903     if (!emptyWakeupQueue(cap)) {
904         ACQUIRE_LOCK(&cap->lock);
905         if (emptyRunQueue(cap)) {
906             cap->run_queue_hd = cap->wakeup_queue_hd;
907             cap->run_queue_tl = cap->wakeup_queue_tl;
908         } else {
909             cap->run_queue_tl->link = cap->wakeup_queue_hd;
910             cap->run_queue_tl = cap->wakeup_queue_tl;
911         }
912         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
913         RELEASE_LOCK(&cap->lock);
914     }
915 #endif
916 }
917
918 /* ----------------------------------------------------------------------------
919  * Check for threads blocked on BLACKHOLEs that can be woken up
920  * ------------------------------------------------------------------------- */
921 static void
922 scheduleCheckBlackHoles (Capability *cap)
923 {
924     if ( blackholes_need_checking ) // check without the lock first
925     {
926         ACQUIRE_LOCK(&sched_mutex);
927         if ( blackholes_need_checking ) {
928             checkBlackHoles(cap);
929             blackholes_need_checking = rtsFalse;
930         }
931         RELEASE_LOCK(&sched_mutex);
932     }
933 }
934
935 /* ----------------------------------------------------------------------------
936  * Detect deadlock conditions and attempt to resolve them.
937  * ------------------------------------------------------------------------- */
938
939 static void
940 scheduleDetectDeadlock (Capability *cap, Task *task)
941 {
942
943 #if defined(PARALLEL_HASKELL)
944     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
945     return;
946 #endif
947
948     /* 
949      * Detect deadlock: when we have no threads to run, there are no
950      * threads blocked, waiting for I/O, or sleeping, and all the
951      * other tasks are waiting for work, we must have a deadlock of
952      * some description.
953      */
954     if ( emptyThreadQueues(cap) )
955     {
956 #if defined(THREADED_RTS)
957         /* 
958          * In the threaded RTS, we only check for deadlock if there
959          * has been no activity in a complete timeslice.  This means
960          * we won't eagerly start a full GC just because we don't have
961          * any threads to run currently.
962          */
963         if (recent_activity != ACTIVITY_INACTIVE) return;
964 #endif
965
966         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
967
968         // Garbage collection can release some new threads due to
969         // either (a) finalizers or (b) threads resurrected because
970         // they are unreachable and will therefore be sent an
971         // exception.  Any threads thus released will be immediately
972         // runnable.
973         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
974
975         recent_activity = ACTIVITY_DONE_GC;
976         
977         if ( !emptyRunQueue(cap) ) return;
978
979 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
980         /* If we have user-installed signal handlers, then wait
981          * for signals to arrive rather then bombing out with a
982          * deadlock.
983          */
984         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
985             debugTrace(DEBUG_sched,
986                        "still deadlocked, waiting for signals...");
987
988             awaitUserSignals();
989
990             if (signals_pending()) {
991                 startSignalHandlers(cap);
992             }
993
994             // either we have threads to run, or we were interrupted:
995             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
996         }
997 #endif
998
999 #if !defined(THREADED_RTS)
1000         /* Probably a real deadlock.  Send the current main thread the
1001          * Deadlock exception.
1002          */
1003         if (task->tso) {
1004             switch (task->tso->why_blocked) {
1005             case BlockedOnSTM:
1006             case BlockedOnBlackHole:
1007             case BlockedOnException:
1008             case BlockedOnMVar:
1009                 throwToSingleThreaded(cap, task->tso, 
1010                                       (StgClosure *)NonTermination_closure);
1011                 return;
1012             default:
1013                 barf("deadlock: main thread blocked in a strange way");
1014             }
1015         }
1016         return;
1017 #endif
1018     }
1019 }
1020
1021 /* ----------------------------------------------------------------------------
1022  * Process an event (GRAN only)
1023  * ------------------------------------------------------------------------- */
1024
1025 #if defined(GRAN)
1026 static StgTSO *
1027 scheduleProcessEvent(rtsEvent *event)
1028 {
1029     StgTSO *t;
1030
1031     if (RtsFlags.GranFlags.Light)
1032       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1033
1034     /* adjust time based on time-stamp */
1035     if (event->time > CurrentTime[CurrentProc] &&
1036         event->evttype != ContinueThread)
1037       CurrentTime[CurrentProc] = event->time;
1038     
1039     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1040     if (!RtsFlags.GranFlags.Light)
1041       handleIdlePEs();
1042
1043     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1044
1045     /* main event dispatcher in GranSim */
1046     switch (event->evttype) {
1047       /* Should just be continuing execution */
1048     case ContinueThread:
1049       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1050       /* ToDo: check assertion
1051       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1052              run_queue_hd != END_TSO_QUEUE);
1053       */
1054       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1055       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1056           procStatus[CurrentProc]==Fetching) {
1057         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1058               CurrentTSO->id, CurrentTSO, CurrentProc);
1059         goto next_thread;
1060       } 
1061       /* Ignore ContinueThreads for completed threads */
1062       if (CurrentTSO->what_next == ThreadComplete) {
1063         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1064               CurrentTSO->id, CurrentTSO, CurrentProc);
1065         goto next_thread;
1066       } 
1067       /* Ignore ContinueThreads for threads that are being migrated */
1068       if (PROCS(CurrentTSO)==Nowhere) { 
1069         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1070               CurrentTSO->id, CurrentTSO, CurrentProc);
1071         goto next_thread;
1072       }
1073       /* The thread should be at the beginning of the run queue */
1074       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1075         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1076               CurrentTSO->id, CurrentTSO, CurrentProc);
1077         break; // run the thread anyway
1078       }
1079       /*
1080       new_event(proc, proc, CurrentTime[proc],
1081                 FindWork,
1082                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1083       goto next_thread; 
1084       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1085       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1086
1087     case FetchNode:
1088       do_the_fetchnode(event);
1089       goto next_thread;             /* handle next event in event queue  */
1090       
1091     case GlobalBlock:
1092       do_the_globalblock(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case FetchReply:
1096       do_the_fetchreply(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case UnblockThread:   /* Move from the blocked queue to the tail of */
1100       do_the_unblock(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case ResumeThread:  /* Move from the blocked queue to the tail of */
1104       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1105       event->tso->gran.blocktime += 
1106         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1107       do_the_startthread(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case StartThread:
1111       do_the_startthread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case MoveThread:
1115       do_the_movethread(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case MoveSpark:
1119       do_the_movespark(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case FindWork:
1123       do_the_findwork(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     default:
1127       barf("Illegal event type %u\n", event->evttype);
1128     }  /* switch */
1129     
1130     /* This point was scheduler_loop in the old RTS */
1131
1132     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1133
1134     TimeOfLastEvent = CurrentTime[CurrentProc];
1135     TimeOfNextEvent = get_time_of_next_event();
1136     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1137     // CurrentTSO = ThreadQueueHd;
1138
1139     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1140                          TimeOfNextEvent));
1141
1142     if (RtsFlags.GranFlags.Light) 
1143       GranSimLight_leave_system(event, &ActiveTSO); 
1144
1145     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1146
1147     IF_DEBUG(gran, 
1148              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1149
1150     /* in a GranSim setup the TSO stays on the run queue */
1151     t = CurrentTSO;
1152     /* Take a thread from the run queue. */
1153     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1154
1155     IF_DEBUG(gran, 
1156              debugBelch("GRAN: About to run current thread, which is\n");
1157              G_TSO(t,5));
1158
1159     context_switch = 0; // turned on via GranYield, checking events and time slice
1160
1161     IF_DEBUG(gran, 
1162              DumpGranEvent(GR_SCHEDULE, t));
1163
1164     procStatus[CurrentProc] = Busy;
1165 }
1166 #endif // GRAN
1167
1168 /* ----------------------------------------------------------------------------
1169  * Send pending messages (PARALLEL_HASKELL only)
1170  * ------------------------------------------------------------------------- */
1171
1172 #if defined(PARALLEL_HASKELL)
1173 static StgTSO *
1174 scheduleSendPendingMessages(void)
1175 {
1176     StgSparkPool *pool;
1177     rtsSpark spark;
1178     StgTSO *t;
1179
1180 # if defined(PAR) // global Mem.Mgmt., omit for now
1181     if (PendingFetches != END_BF_QUEUE) {
1182         processFetches();
1183     }
1184 # endif
1185     
1186     if (RtsFlags.ParFlags.BufferTime) {
1187         // if we use message buffering, we must send away all message
1188         // packets which have become too old...
1189         sendOldBuffers(); 
1190     }
1191 }
1192 #endif
1193
1194 /* ----------------------------------------------------------------------------
1195  * Activate spark threads (PARALLEL_HASKELL only)
1196  * ------------------------------------------------------------------------- */
1197
1198 #if defined(PARALLEL_HASKELL)
1199 static void
1200 scheduleActivateSpark(void)
1201 {
1202 #if defined(SPARKS)
1203   ASSERT(emptyRunQueue());
1204 /* We get here if the run queue is empty and want some work.
1205    We try to turn a spark into a thread, and add it to the run queue,
1206    from where it will be picked up in the next iteration of the scheduler
1207    loop.
1208 */
1209
1210       /* :-[  no local threads => look out for local sparks */
1211       /* the spark pool for the current PE */
1212       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1213       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1214           pool->hd < pool->tl) {
1215         /* 
1216          * ToDo: add GC code check that we really have enough heap afterwards!!
1217          * Old comment:
1218          * If we're here (no runnable threads) and we have pending
1219          * sparks, we must have a space problem.  Get enough space
1220          * to turn one of those pending sparks into a
1221          * thread... 
1222          */
1223
1224         spark = findSpark(rtsFalse);            /* get a spark */
1225         if (spark != (rtsSpark) NULL) {
1226           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1227           IF_PAR_DEBUG(fish, // schedule,
1228                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1229                              tso->id, tso, advisory_thread_count));
1230
1231           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1232             IF_PAR_DEBUG(fish, // schedule,
1233                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1234                             spark));
1235             return rtsFalse; /* failed to generate a thread */
1236           }                  /* otherwise fall through & pick-up new tso */
1237         } else {
1238           IF_PAR_DEBUG(fish, // schedule,
1239                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1240                              spark_queue_len(pool)));
1241           return rtsFalse;  /* failed to generate a thread */
1242         }
1243         return rtsTrue;  /* success in generating a thread */
1244   } else { /* no more threads permitted or pool empty */
1245     return rtsFalse;  /* failed to generateThread */
1246   }
1247 #else
1248   tso = NULL; // avoid compiler warning only
1249   return rtsFalse;  /* dummy in non-PAR setup */
1250 #endif // SPARKS
1251 }
1252 #endif // PARALLEL_HASKELL
1253
1254 /* ----------------------------------------------------------------------------
1255  * Get work from a remote node (PARALLEL_HASKELL only)
1256  * ------------------------------------------------------------------------- */
1257     
1258 #if defined(PARALLEL_HASKELL)
1259 static rtsBool
1260 scheduleGetRemoteWork(rtsBool *receivedFinish)
1261 {
1262   ASSERT(emptyRunQueue());
1263
1264   if (RtsFlags.ParFlags.BufferTime) {
1265         IF_PAR_DEBUG(verbose, 
1266                 debugBelch("...send all pending data,"));
1267         {
1268           nat i;
1269           for (i=1; i<=nPEs; i++)
1270             sendImmediately(i); // send all messages away immediately
1271         }
1272   }
1273 # ifndef SPARKS
1274         //++EDEN++ idle() , i.e. send all buffers, wait for work
1275         // suppress fishing in EDEN... just look for incoming messages
1276         // (blocking receive)
1277   IF_PAR_DEBUG(verbose, 
1278                debugBelch("...wait for incoming messages...\n"));
1279   *receivedFinish = processMessages(); // blocking receive...
1280
1281         // and reenter scheduling loop after having received something
1282         // (return rtsFalse below)
1283
1284 # else /* activate SPARKS machinery */
1285 /* We get here, if we have no work, tried to activate a local spark, but still
1286    have no work. We try to get a remote spark, by sending a FISH message.
1287    Thread migration should be added here, and triggered when a sequence of 
1288    fishes returns without work. */
1289         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1290
1291       /* =8-[  no local sparks => look for work on other PEs */
1292         /*
1293          * We really have absolutely no work.  Send out a fish
1294          * (there may be some out there already), and wait for
1295          * something to arrive.  We clearly can't run any threads
1296          * until a SCHEDULE or RESUME arrives, and so that's what
1297          * we're hoping to see.  (Of course, we still have to
1298          * respond to other types of messages.)
1299          */
1300         rtsTime now = msTime() /*CURRENT_TIME*/;
1301         IF_PAR_DEBUG(verbose, 
1302                      debugBelch("--  now=%ld\n", now));
1303         IF_PAR_DEBUG(fish, // verbose,
1304              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1305                  (last_fish_arrived_at!=0 &&
1306                   last_fish_arrived_at+delay > now)) {
1307                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1308                      now, last_fish_arrived_at+delay, 
1309                      last_fish_arrived_at,
1310                      delay);
1311              });
1312   
1313         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1315           if (last_fish_arrived_at==0 ||
1316               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1317             /* outstandingFishes is set in sendFish, processFish;
1318                avoid flooding system with fishes via delay */
1319     next_fish_to_send_at = 0;  
1320   } else {
1321     /* ToDo: this should be done in the main scheduling loop to avoid the
1322              busy wait here; not so bad if fish delay is very small  */
1323     int iq = 0; // DEBUGGING -- HWL
1324     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1325     /* send a fish when ready, but process messages that arrive in the meantime */
1326     do {
1327       if (PacketsWaiting()) {
1328         iq++; // DEBUGGING
1329         *receivedFinish = processMessages();
1330       }
1331       now = msTime();
1332     } while (!*receivedFinish || now<next_fish_to_send_at);
1333     // JB: This means the fish could become obsolete, if we receive
1334     // work. Better check for work again? 
1335     // last line: while (!receivedFinish || !haveWork || now<...)
1336     // next line: if (receivedFinish || haveWork )
1337
1338     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1339       return rtsFalse;  // NB: this will leave scheduler loop
1340                         // immediately after return!
1341                           
1342     IF_PAR_DEBUG(fish, // verbose,
1343                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1344
1345   }
1346
1347     // JB: IMHO, this should all be hidden inside sendFish(...)
1348     /* pe = choosePE(); 
1349        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1350                 NEW_FISH_HUNGER);
1351
1352     // Global statistics: count no. of fishes
1353     if (RtsFlags.ParFlags.ParStats.Global &&
1354          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1355            globalParStats.tot_fish_mess++;
1356            }
1357     */ 
1358
1359   /* delayed fishes must have been sent by now! */
1360   next_fish_to_send_at = 0;  
1361   }
1362       
1363   *receivedFinish = processMessages();
1364 # endif /* SPARKS */
1365
1366  return rtsFalse;
1367  /* NB: this function always returns rtsFalse, meaning the scheduler
1368     loop continues with the next iteration; 
1369     rationale: 
1370       return code means success in finding work; we enter this function
1371       if there is no local work, thus have to send a fish which takes
1372       time until it arrives with work; in the meantime we should process
1373       messages in the main loop;
1374  */
1375 }
1376 #endif // PARALLEL_HASKELL
1377
1378 /* ----------------------------------------------------------------------------
1379  * PAR/GRAN: Report stats & debugging info(?)
1380  * ------------------------------------------------------------------------- */
1381
1382 #if defined(PAR) || defined(GRAN)
1383 static void
1384 scheduleGranParReport(void)
1385 {
1386   ASSERT(run_queue_hd != END_TSO_QUEUE);
1387
1388   /* Take a thread from the run queue, if we have work */
1389   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1390
1391     /* If this TSO has got its outport closed in the meantime, 
1392      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1393      * It has to be marked as TH_DEAD for this purpose.
1394      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1395
1396 JB: TODO: investigate wether state change field could be nuked
1397      entirely and replaced by the normal tso state (whatnext
1398      field). All we want to do is to kill tsos from outside.
1399      */
1400
1401     /* ToDo: write something to the log-file
1402     if (RTSflags.ParFlags.granSimStats && !sameThread)
1403         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1404
1405     CurrentTSO = t;
1406     */
1407     /* the spark pool for the current PE */
1408     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1409
1410     IF_DEBUG(scheduler, 
1411              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1412                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413
1414     IF_PAR_DEBUG(fish,
1415              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1416                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1417
1418     if (RtsFlags.ParFlags.ParStats.Full && 
1419         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1420         (emitSchedule || // forced emit
1421          (t && LastTSO && t->id != LastTSO->id))) {
1422       /* 
1423          we are running a different TSO, so write a schedule event to log file
1424          NB: If we use fair scheduling we also have to write  a deschedule 
1425              event for LastTSO; with unfair scheduling we know that the
1426              previous tso has blocked whenever we switch to another tso, so
1427              we don't need it in GUM for now
1428       */
1429       IF_PAR_DEBUG(fish, // schedule,
1430                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1431
1432       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1433                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1434       emitSchedule = rtsFalse;
1435     }
1436 }     
1437 #endif
1438
1439 /* ----------------------------------------------------------------------------
1440  * After running a thread...
1441  * ------------------------------------------------------------------------- */
1442
1443 static void
1444 schedulePostRunThread(void)
1445 {
1446 #if defined(PAR)
1447     /* HACK 675: if the last thread didn't yield, make sure to print a 
1448        SCHEDULE event to the log file when StgRunning the next thread, even
1449        if it is the same one as before */
1450     LastTSO = t; 
1451     TimeOfLastYield = CURRENT_TIME;
1452 #endif
1453
1454   /* some statistics gathering in the parallel case */
1455
1456 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1457   switch (ret) {
1458     case HeapOverflow:
1459 # if defined(GRAN)
1460       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1461       globalGranStats.tot_heapover++;
1462 # elif defined(PAR)
1463       globalParStats.tot_heapover++;
1464 # endif
1465       break;
1466
1467      case StackOverflow:
1468 # if defined(GRAN)
1469       IF_DEBUG(gran, 
1470                DumpGranEvent(GR_DESCHEDULE, t));
1471       globalGranStats.tot_stackover++;
1472 # elif defined(PAR)
1473       // IF_DEBUG(par, 
1474       // DumpGranEvent(GR_DESCHEDULE, t);
1475       globalParStats.tot_stackover++;
1476 # endif
1477       break;
1478
1479     case ThreadYielding:
1480 # if defined(GRAN)
1481       IF_DEBUG(gran, 
1482                DumpGranEvent(GR_DESCHEDULE, t));
1483       globalGranStats.tot_yields++;
1484 # elif defined(PAR)
1485       // IF_DEBUG(par, 
1486       // DumpGranEvent(GR_DESCHEDULE, t);
1487       globalParStats.tot_yields++;
1488 # endif
1489       break; 
1490
1491     case ThreadBlocked:
1492 # if defined(GRAN)
1493         debugTrace(DEBUG_sched, 
1494                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1495                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1496                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1497                if (t->block_info.closure!=(StgClosure*)NULL)
1498                  print_bq(t->block_info.closure);
1499                debugBelch("\n"));
1500
1501       // ??? needed; should emit block before
1502       IF_DEBUG(gran, 
1503                DumpGranEvent(GR_DESCHEDULE, t)); 
1504       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1505       /*
1506         ngoq Dogh!
1507       ASSERT(procStatus[CurrentProc]==Busy || 
1508               ((procStatus[CurrentProc]==Fetching) && 
1509               (t->block_info.closure!=(StgClosure*)NULL)));
1510       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1511           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1512             procStatus[CurrentProc]==Fetching)) 
1513         procStatus[CurrentProc] = Idle;
1514       */
1515 # elif defined(PAR)
1516 //++PAR++  blockThread() writes the event (change?)
1517 # endif
1518     break;
1519
1520   case ThreadFinished:
1521     break;
1522
1523   default:
1524     barf("parGlobalStats: unknown return code");
1525     break;
1526     }
1527 #endif
1528 }
1529
1530 /* -----------------------------------------------------------------------------
1531  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1532  * -------------------------------------------------------------------------- */
1533
1534 static rtsBool
1535 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1536 {
1537     // did the task ask for a large block?
1538     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1539         // if so, get one and push it on the front of the nursery.
1540         bdescr *bd;
1541         lnat blocks;
1542         
1543         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1544         
1545         debugTrace(DEBUG_sched,
1546                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1547                    (long)t->id, whatNext_strs[t->what_next], blocks);
1548     
1549         // don't do this if the nursery is (nearly) full, we'll GC first.
1550         if (cap->r.rCurrentNursery->link != NULL ||
1551             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1552                                                // if the nursery has only one block.
1553             
1554             ACQUIRE_SM_LOCK
1555             bd = allocGroup( blocks );
1556             RELEASE_SM_LOCK
1557             cap->r.rNursery->n_blocks += blocks;
1558             
1559             // link the new group into the list
1560             bd->link = cap->r.rCurrentNursery;
1561             bd->u.back = cap->r.rCurrentNursery->u.back;
1562             if (cap->r.rCurrentNursery->u.back != NULL) {
1563                 cap->r.rCurrentNursery->u.back->link = bd;
1564             } else {
1565 #if !defined(THREADED_RTS)
1566                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1567                        g0s0 == cap->r.rNursery);
1568 #endif
1569                 cap->r.rNursery->blocks = bd;
1570             }             
1571             cap->r.rCurrentNursery->u.back = bd;
1572             
1573             // initialise it as a nursery block.  We initialise the
1574             // step, gen_no, and flags field of *every* sub-block in
1575             // this large block, because this is easier than making
1576             // sure that we always find the block head of a large
1577             // block whenever we call Bdescr() (eg. evacuate() and
1578             // isAlive() in the GC would both have to do this, at
1579             // least).
1580             { 
1581                 bdescr *x;
1582                 for (x = bd; x < bd + blocks; x++) {
1583                     x->step = cap->r.rNursery;
1584                     x->gen_no = 0;
1585                     x->flags = 0;
1586                 }
1587             }
1588             
1589             // This assert can be a killer if the app is doing lots
1590             // of large block allocations.
1591             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1592             
1593             // now update the nursery to point to the new block
1594             cap->r.rCurrentNursery = bd;
1595             
1596             // we might be unlucky and have another thread get on the
1597             // run queue before us and steal the large block, but in that
1598             // case the thread will just end up requesting another large
1599             // block.
1600             pushOnRunQueue(cap,t);
1601             return rtsFalse;  /* not actually GC'ing */
1602         }
1603     }
1604     
1605     debugTrace(DEBUG_sched,
1606                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1607                (long)t->id, whatNext_strs[t->what_next]);
1608
1609 #if defined(GRAN)
1610     ASSERT(!is_on_queue(t,CurrentProc));
1611 #elif defined(PARALLEL_HASKELL)
1612     /* Currently we emit a DESCHEDULE event before GC in GUM.
1613        ToDo: either add separate event to distinguish SYSTEM time from rest
1614        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1615     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1616         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1617                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1618         emitSchedule = rtsTrue;
1619     }
1620 #endif
1621       
1622     pushOnRunQueue(cap,t);
1623     return rtsTrue;
1624     /* actual GC is done at the end of the while loop in schedule() */
1625 }
1626
1627 /* -----------------------------------------------------------------------------
1628  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1629  * -------------------------------------------------------------------------- */
1630
1631 static void
1632 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1633 {
1634     debugTrace (DEBUG_sched,
1635                 "--<< thread %ld (%s) stopped, StackOverflow", 
1636                 (long)t->id, whatNext_strs[t->what_next]);
1637
1638     /* just adjust the stack for this thread, then pop it back
1639      * on the run queue.
1640      */
1641     { 
1642         /* enlarge the stack */
1643         StgTSO *new_t = threadStackOverflow(cap, t);
1644         
1645         /* The TSO attached to this Task may have moved, so update the
1646          * pointer to it.
1647          */
1648         if (task->tso == t) {
1649             task->tso = new_t;
1650         }
1651         pushOnRunQueue(cap,new_t);
1652     }
1653 }
1654
1655 /* -----------------------------------------------------------------------------
1656  * Handle a thread that returned to the scheduler with ThreadYielding
1657  * -------------------------------------------------------------------------- */
1658
1659 static rtsBool
1660 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1661 {
1662     // Reset the context switch flag.  We don't do this just before
1663     // running the thread, because that would mean we would lose ticks
1664     // during GC, which can lead to unfair scheduling (a thread hogs
1665     // the CPU because the tick always arrives during GC).  This way
1666     // penalises threads that do a lot of allocation, but that seems
1667     // better than the alternative.
1668     context_switch = 0;
1669     
1670     /* put the thread back on the run queue.  Then, if we're ready to
1671      * GC, check whether this is the last task to stop.  If so, wake
1672      * up the GC thread.  getThread will block during a GC until the
1673      * GC is finished.
1674      */
1675 #ifdef DEBUG
1676     if (t->what_next != prev_what_next) {
1677         debugTrace(DEBUG_sched,
1678                    "--<< thread %ld (%s) stopped to switch evaluators", 
1679                    (long)t->id, whatNext_strs[t->what_next]);
1680     } else {
1681         debugTrace(DEBUG_sched,
1682                    "--<< thread %ld (%s) stopped, yielding",
1683                    (long)t->id, whatNext_strs[t->what_next]);
1684     }
1685 #endif
1686     
1687     IF_DEBUG(sanity,
1688              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1689              checkTSO(t));
1690     ASSERT(t->link == END_TSO_QUEUE);
1691     
1692     // Shortcut if we're just switching evaluators: don't bother
1693     // doing stack squeezing (which can be expensive), just run the
1694     // thread.
1695     if (t->what_next != prev_what_next) {
1696         return rtsTrue;
1697     }
1698     
1699 #if defined(GRAN)
1700     ASSERT(!is_on_queue(t,CurrentProc));
1701       
1702     IF_DEBUG(sanity,
1703              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1704              checkThreadQsSanity(rtsTrue));
1705
1706 #endif
1707
1708     addToRunQueue(cap,t);
1709
1710 #if defined(GRAN)
1711     /* add a ContinueThread event to actually process the thread */
1712     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1713               ContinueThread,
1714               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1715     IF_GRAN_DEBUG(bq, 
1716                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1717                   G_EVENTQ(0);
1718                   G_CURR_THREADQ(0));
1719 #endif
1720     return rtsFalse;
1721 }
1722
1723 /* -----------------------------------------------------------------------------
1724  * Handle a thread that returned to the scheduler with ThreadBlocked
1725  * -------------------------------------------------------------------------- */
1726
1727 static void
1728 scheduleHandleThreadBlocked( StgTSO *t
1729 #if !defined(GRAN) && !defined(DEBUG)
1730     STG_UNUSED
1731 #endif
1732     )
1733 {
1734 #if defined(GRAN)
1735     IF_DEBUG(scheduler,
1736              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1737                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1738              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1739     
1740     // ??? needed; should emit block before
1741     IF_DEBUG(gran, 
1742              DumpGranEvent(GR_DESCHEDULE, t)); 
1743     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1744     /*
1745       ngoq Dogh!
1746       ASSERT(procStatus[CurrentProc]==Busy || 
1747       ((procStatus[CurrentProc]==Fetching) && 
1748       (t->block_info.closure!=(StgClosure*)NULL)));
1749       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1750       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1751       procStatus[CurrentProc]==Fetching)) 
1752       procStatus[CurrentProc] = Idle;
1753     */
1754 #elif defined(PAR)
1755     IF_DEBUG(scheduler,
1756              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1757                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1758     IF_PAR_DEBUG(bq,
1759                  
1760                  if (t->block_info.closure!=(StgClosure*)NULL) 
1761                  print_bq(t->block_info.closure));
1762     
1763     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1764     blockThread(t);
1765     
1766     /* whatever we schedule next, we must log that schedule */
1767     emitSchedule = rtsTrue;
1768     
1769 #else /* !GRAN */
1770
1771       // We don't need to do anything.  The thread is blocked, and it
1772       // has tidied up its stack and placed itself on whatever queue
1773       // it needs to be on.
1774
1775     // ASSERT(t->why_blocked != NotBlocked);
1776     // Not true: for example,
1777     //    - in THREADED_RTS, the thread may already have been woken
1778     //      up by another Capability.  This actually happens: try
1779     //      conc023 +RTS -N2.
1780     //    - the thread may have woken itself up already, because
1781     //      threadPaused() might have raised a blocked throwTo
1782     //      exception, see maybePerformBlockedException().
1783
1784 #ifdef DEBUG
1785     if (traceClass(DEBUG_sched)) {
1786         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1787                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1788         printThreadBlockage(t);
1789         debugTraceEnd();
1790     }
1791 #endif
1792     
1793     /* Only for dumping event to log file 
1794        ToDo: do I need this in GranSim, too?
1795        blockThread(t);
1796     */
1797 #endif
1798 }
1799
1800 /* -----------------------------------------------------------------------------
1801  * Handle a thread that returned to the scheduler with ThreadFinished
1802  * -------------------------------------------------------------------------- */
1803
1804 static rtsBool
1805 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1806 {
1807     /* Need to check whether this was a main thread, and if so,
1808      * return with the return value.
1809      *
1810      * We also end up here if the thread kills itself with an
1811      * uncaught exception, see Exception.cmm.
1812      */
1813     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1814                (unsigned long)t->id, whatNext_strs[t->what_next]);
1815
1816     /* Inform the Hpc that a thread has finished */
1817     hs_hpc_thread_finished_event(t);
1818
1819 #if defined(GRAN)
1820       endThread(t, CurrentProc); // clean-up the thread
1821 #elif defined(PARALLEL_HASKELL)
1822       /* For now all are advisory -- HWL */
1823       //if(t->priority==AdvisoryPriority) ??
1824       advisory_thread_count--; // JB: Caution with this counter, buggy!
1825       
1826 # if defined(DIST)
1827       if(t->dist.priority==RevalPriority)
1828         FinishReval(t);
1829 # endif
1830     
1831 # if defined(EDENOLD)
1832       // the thread could still have an outport... (BUG)
1833       if (t->eden.outport != -1) {
1834       // delete the outport for the tso which has finished...
1835         IF_PAR_DEBUG(eden_ports,
1836                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1837                               t->eden.outport, t->id));
1838         deleteOPT(t);
1839       }
1840       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1841       if (t->eden.epid != -1) {
1842         IF_PAR_DEBUG(eden_ports,
1843                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1844                            t->id, t->eden.epid));
1845         removeTSOfromProcess(t);
1846       }
1847 # endif 
1848
1849 # if defined(PAR)
1850       if (RtsFlags.ParFlags.ParStats.Full &&
1851           !RtsFlags.ParFlags.ParStats.Suppressed) 
1852         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1853
1854       //  t->par only contains statistics: left out for now...
1855       IF_PAR_DEBUG(fish,
1856                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1857                               t->id,t,t->par.sparkname));
1858 # endif
1859 #endif // PARALLEL_HASKELL
1860
1861       //
1862       // Check whether the thread that just completed was a bound
1863       // thread, and if so return with the result.  
1864       //
1865       // There is an assumption here that all thread completion goes
1866       // through this point; we need to make sure that if a thread
1867       // ends up in the ThreadKilled state, that it stays on the run
1868       // queue so it can be dealt with here.
1869       //
1870
1871       if (t->bound) {
1872
1873           if (t->bound != task) {
1874 #if !defined(THREADED_RTS)
1875               // Must be a bound thread that is not the topmost one.  Leave
1876               // it on the run queue until the stack has unwound to the
1877               // point where we can deal with this.  Leaving it on the run
1878               // queue also ensures that the garbage collector knows about
1879               // this thread and its return value (it gets dropped from the
1880               // all_threads list so there's no other way to find it).
1881               appendToRunQueue(cap,t);
1882               return rtsFalse;
1883 #else
1884               // this cannot happen in the threaded RTS, because a
1885               // bound thread can only be run by the appropriate Task.
1886               barf("finished bound thread that isn't mine");
1887 #endif
1888           }
1889
1890           ASSERT(task->tso == t);
1891
1892           if (t->what_next == ThreadComplete) {
1893               if (task->ret) {
1894                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1895                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1896               }
1897               task->stat = Success;
1898           } else {
1899               if (task->ret) {
1900                   *(task->ret) = NULL;
1901               }
1902               if (sched_state >= SCHED_INTERRUPTING) {
1903                   task->stat = Interrupted;
1904               } else {
1905                   task->stat = Killed;
1906               }
1907           }
1908 #ifdef DEBUG
1909           removeThreadLabel((StgWord)task->tso->id);
1910 #endif
1911           return rtsTrue; // tells schedule() to return
1912       }
1913
1914       return rtsFalse;
1915 }
1916
1917 /* -----------------------------------------------------------------------------
1918  * Perform a heap census
1919  * -------------------------------------------------------------------------- */
1920
1921 static rtsBool
1922 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1923 {
1924     // When we have +RTS -i0 and we're heap profiling, do a census at
1925     // every GC.  This lets us get repeatable runs for debugging.
1926     if (performHeapProfile ||
1927         (RtsFlags.ProfFlags.profileInterval==0 &&
1928          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1929         return rtsTrue;
1930     } else {
1931         return rtsFalse;
1932     }
1933 }
1934
1935 /* -----------------------------------------------------------------------------
1936  * Perform a garbage collection if necessary
1937  * -------------------------------------------------------------------------- */
1938
1939 static Capability *
1940 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1941 {
1942     StgTSO *t;
1943     rtsBool heap_census;
1944 #ifdef THREADED_RTS
1945     static volatile StgWord waiting_for_gc;
1946     rtsBool was_waiting;
1947     nat i;
1948 #endif
1949
1950 #ifdef THREADED_RTS
1951     // In order to GC, there must be no threads running Haskell code.
1952     // Therefore, the GC thread needs to hold *all* the capabilities,
1953     // and release them after the GC has completed.  
1954     //
1955     // This seems to be the simplest way: previous attempts involved
1956     // making all the threads with capabilities give up their
1957     // capabilities and sleep except for the *last* one, which
1958     // actually did the GC.  But it's quite hard to arrange for all
1959     // the other tasks to sleep and stay asleep.
1960     //
1961         
1962     was_waiting = cas(&waiting_for_gc, 0, 1);
1963     if (was_waiting) {
1964         do {
1965             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1966             if (cap) yieldCapability(&cap,task);
1967         } while (waiting_for_gc);
1968         return cap;  // NOTE: task->cap might have changed here
1969     }
1970
1971     for (i=0; i < n_capabilities; i++) {
1972         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1973         if (cap != &capabilities[i]) {
1974             Capability *pcap = &capabilities[i];
1975             // we better hope this task doesn't get migrated to
1976             // another Capability while we're waiting for this one.
1977             // It won't, because load balancing happens while we have
1978             // all the Capabilities, but even so it's a slightly
1979             // unsavoury invariant.
1980             task->cap = pcap;
1981             context_switch = 1;
1982             waitForReturnCapability(&pcap, task);
1983             if (pcap != &capabilities[i]) {
1984                 barf("scheduleDoGC: got the wrong capability");
1985             }
1986         }
1987     }
1988
1989     waiting_for_gc = rtsFalse;
1990 #endif
1991
1992     /* Kick any transactions which are invalid back to their
1993      * atomically frames.  When next scheduled they will try to
1994      * commit, this commit will fail and they will retry.
1995      */
1996     { 
1997         StgTSO *next;
1998
1999         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2000             if (t->what_next == ThreadRelocated) {
2001                 next = t->link;
2002             } else {
2003                 next = t->global_link;
2004                 
2005                 // This is a good place to check for blocked
2006                 // exceptions.  It might be the case that a thread is
2007                 // blocked on delivering an exception to a thread that
2008                 // is also blocked - we try to ensure that this
2009                 // doesn't happen in throwTo(), but it's too hard (or
2010                 // impossible) to close all the race holes, so we
2011                 // accept that some might get through and deal with
2012                 // them here.  A GC will always happen at some point,
2013                 // even if the system is otherwise deadlocked.
2014                 maybePerformBlockedException (&capabilities[0], t);
2015
2016                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2017                     if (!stmValidateNestOfTransactions (t -> trec)) {
2018                         debugTrace(DEBUG_sched | DEBUG_stm,
2019                                    "trec %p found wasting its time", t);
2020                         
2021                         // strip the stack back to the
2022                         // ATOMICALLY_FRAME, aborting the (nested)
2023                         // transaction, and saving the stack of any
2024                         // partially-evaluated thunks on the heap.
2025                         throwToSingleThreaded_(&capabilities[0], t, 
2026                                                NULL, rtsTrue, NULL);
2027                         
2028 #ifdef REG_R1
2029                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2030 #endif
2031                     }
2032                 }
2033             }
2034         }
2035     }
2036     
2037     // so this happens periodically:
2038     if (cap) scheduleCheckBlackHoles(cap);
2039     
2040     IF_DEBUG(scheduler, printAllThreads());
2041
2042     /*
2043      * We now have all the capabilities; if we're in an interrupting
2044      * state, then we should take the opportunity to delete all the
2045      * threads in the system.
2046      */
2047     if (sched_state >= SCHED_INTERRUPTING) {
2048         deleteAllThreads(&capabilities[0]);
2049         sched_state = SCHED_SHUTTING_DOWN;
2050     }
2051     
2052     heap_census = scheduleNeedHeapProfile(rtsTrue);
2053
2054     /* everybody back, start the GC.
2055      * Could do it in this thread, or signal a condition var
2056      * to do it in another thread.  Either way, we need to
2057      * broadcast on gc_pending_cond afterward.
2058      */
2059 #if defined(THREADED_RTS)
2060     debugTrace(DEBUG_sched, "doing GC");
2061 #endif
2062     GarbageCollect(force_major || heap_census);
2063     
2064     if (heap_census) {
2065         debugTrace(DEBUG_sched, "performing heap census");
2066         heapCensus();
2067         performHeapProfile = rtsFalse;
2068     }
2069
2070 #if defined(THREADED_RTS)
2071     // release our stash of capabilities.
2072     for (i = 0; i < n_capabilities; i++) {
2073         if (cap != &capabilities[i]) {
2074             task->cap = &capabilities[i];
2075             releaseCapability(&capabilities[i]);
2076         }
2077     }
2078     if (cap) {
2079         task->cap = cap;
2080     } else {
2081         task->cap = NULL;
2082     }
2083 #endif
2084
2085 #if defined(GRAN)
2086     /* add a ContinueThread event to continue execution of current thread */
2087     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2088               ContinueThread,
2089               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2090     IF_GRAN_DEBUG(bq, 
2091                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2092                   G_EVENTQ(0);
2093                   G_CURR_THREADQ(0));
2094 #endif /* GRAN */
2095
2096     return cap;
2097 }
2098
2099 /* ---------------------------------------------------------------------------
2100  * Singleton fork(). Do not copy any running threads.
2101  * ------------------------------------------------------------------------- */
2102
2103 pid_t
2104 forkProcess(HsStablePtr *entry
2105 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2106             STG_UNUSED
2107 #endif
2108            )
2109 {
2110 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2111     Task *task;
2112     pid_t pid;
2113     StgTSO* t,*next;
2114     Capability *cap;
2115     
2116 #if defined(THREADED_RTS)
2117     if (RtsFlags.ParFlags.nNodes > 1) {
2118         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2119         stg_exit(EXIT_FAILURE);
2120     }
2121 #endif
2122
2123     debugTrace(DEBUG_sched, "forking!");
2124     
2125     // ToDo: for SMP, we should probably acquire *all* the capabilities
2126     cap = rts_lock();
2127     
2128     pid = fork();
2129     
2130     if (pid) { // parent
2131         
2132         // just return the pid
2133         rts_unlock(cap);
2134         return pid;
2135         
2136     } else { // child
2137         
2138         // Now, all OS threads except the thread that forked are
2139         // stopped.  We need to stop all Haskell threads, including
2140         // those involved in foreign calls.  Also we need to delete
2141         // all Tasks, because they correspond to OS threads that are
2142         // now gone.
2143
2144         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2145             if (t->what_next == ThreadRelocated) {
2146                 next = t->link;
2147             } else {
2148                 next = t->global_link;
2149                 // don't allow threads to catch the ThreadKilled
2150                 // exception, but we do want to raiseAsync() because these
2151                 // threads may be evaluating thunks that we need later.
2152                 deleteThread_(cap,t);
2153             }
2154         }
2155         
2156         // Empty the run queue.  It seems tempting to let all the
2157         // killed threads stay on the run queue as zombies to be
2158         // cleaned up later, but some of them correspond to bound
2159         // threads for which the corresponding Task does not exist.
2160         cap->run_queue_hd = END_TSO_QUEUE;
2161         cap->run_queue_tl = END_TSO_QUEUE;
2162
2163         // Any suspended C-calling Tasks are no more, their OS threads
2164         // don't exist now:
2165         cap->suspended_ccalling_tasks = NULL;
2166
2167         // Empty the all_threads list.  Otherwise, the garbage
2168         // collector may attempt to resurrect some of these threads.
2169         all_threads = END_TSO_QUEUE;
2170
2171         // Wipe the task list, except the current Task.
2172         ACQUIRE_LOCK(&sched_mutex);
2173         for (task = all_tasks; task != NULL; task=task->all_link) {
2174             if (task != cap->running_task) {
2175                 discardTask(task);
2176             }
2177         }
2178         RELEASE_LOCK(&sched_mutex);
2179
2180 #if defined(THREADED_RTS)
2181         // Wipe our spare workers list, they no longer exist.  New
2182         // workers will be created if necessary.
2183         cap->spare_workers = NULL;
2184         cap->returning_tasks_hd = NULL;
2185         cap->returning_tasks_tl = NULL;
2186 #endif
2187
2188         // On Unix, all timers are reset in the child, so we need to start
2189         // the timer again.
2190         startTimer();
2191
2192         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2193         rts_checkSchedStatus("forkProcess",cap);
2194         
2195         rts_unlock(cap);
2196         hs_exit();                      // clean up and exit
2197         stg_exit(EXIT_SUCCESS);
2198     }
2199 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2200     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2201     return -1;
2202 #endif
2203 }
2204
2205 /* ---------------------------------------------------------------------------
2206  * Delete all the threads in the system
2207  * ------------------------------------------------------------------------- */
2208    
2209 static void
2210 deleteAllThreads ( Capability *cap )
2211 {
2212     // NOTE: only safe to call if we own all capabilities.
2213
2214     StgTSO* t, *next;
2215     debugTrace(DEBUG_sched,"deleting all threads");
2216     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2217         if (t->what_next == ThreadRelocated) {
2218             next = t->link;
2219         } else {
2220             next = t->global_link;
2221             deleteThread(cap,t);
2222         }
2223     }      
2224
2225     // The run queue now contains a bunch of ThreadKilled threads.  We
2226     // must not throw these away: the main thread(s) will be in there
2227     // somewhere, and the main scheduler loop has to deal with it.
2228     // Also, the run queue is the only thing keeping these threads from
2229     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2230
2231 #if !defined(THREADED_RTS)
2232     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2233     ASSERT(sleeping_queue == END_TSO_QUEUE);
2234 #endif
2235 }
2236
2237 /* -----------------------------------------------------------------------------
2238    Managing the suspended_ccalling_tasks list.
2239    Locks required: sched_mutex
2240    -------------------------------------------------------------------------- */
2241
2242 STATIC_INLINE void
2243 suspendTask (Capability *cap, Task *task)
2244 {
2245     ASSERT(task->next == NULL && task->prev == NULL);
2246     task->next = cap->suspended_ccalling_tasks;
2247     task->prev = NULL;
2248     if (cap->suspended_ccalling_tasks) {
2249         cap->suspended_ccalling_tasks->prev = task;
2250     }
2251     cap->suspended_ccalling_tasks = task;
2252 }
2253
2254 STATIC_INLINE void
2255 recoverSuspendedTask (Capability *cap, Task *task)
2256 {
2257     if (task->prev) {
2258         task->prev->next = task->next;
2259     } else {
2260         ASSERT(cap->suspended_ccalling_tasks == task);
2261         cap->suspended_ccalling_tasks = task->next;
2262     }
2263     if (task->next) {
2264         task->next->prev = task->prev;
2265     }
2266     task->next = task->prev = NULL;
2267 }
2268
2269 /* ---------------------------------------------------------------------------
2270  * Suspending & resuming Haskell threads.
2271  * 
2272  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2273  * its capability before calling the C function.  This allows another
2274  * task to pick up the capability and carry on running Haskell
2275  * threads.  It also means that if the C call blocks, it won't lock
2276  * the whole system.
2277  *
2278  * The Haskell thread making the C call is put to sleep for the
2279  * duration of the call, on the susepended_ccalling_threads queue.  We
2280  * give out a token to the task, which it can use to resume the thread
2281  * on return from the C function.
2282  * ------------------------------------------------------------------------- */
2283    
2284 void *
2285 suspendThread (StgRegTable *reg)
2286 {
2287   Capability *cap;
2288   int saved_errno;
2289   StgTSO *tso;
2290   Task *task;
2291 #if mingw32_HOST_OS
2292   StgWord32 saved_winerror;
2293 #endif
2294
2295   saved_errno = errno;
2296 #if mingw32_HOST_OS
2297   saved_winerror = GetLastError();
2298 #endif
2299
2300   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2301    */
2302   cap = regTableToCapability(reg);
2303
2304   task = cap->running_task;
2305   tso = cap->r.rCurrentTSO;
2306
2307   debugTrace(DEBUG_sched, 
2308              "thread %lu did a safe foreign call", 
2309              (unsigned long)cap->r.rCurrentTSO->id);
2310
2311   // XXX this might not be necessary --SDM
2312   tso->what_next = ThreadRunGHC;
2313
2314   threadPaused(cap,tso);
2315
2316   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2317       tso->why_blocked = BlockedOnCCall;
2318       tso->flags |= TSO_BLOCKEX;
2319       tso->flags &= ~TSO_INTERRUPTIBLE;
2320   } else {
2321       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2322   }
2323
2324   // Hand back capability
2325   task->suspended_tso = tso;
2326
2327   ACQUIRE_LOCK(&cap->lock);
2328
2329   suspendTask(cap,task);
2330   cap->in_haskell = rtsFalse;
2331   releaseCapability_(cap);
2332   
2333   RELEASE_LOCK(&cap->lock);
2334
2335 #if defined(THREADED_RTS)
2336   /* Preparing to leave the RTS, so ensure there's a native thread/task
2337      waiting to take over.
2338   */
2339   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2340 #endif
2341
2342   errno = saved_errno;
2343 #if mingw32_HOST_OS
2344   SetLastError(saved_winerror);
2345 #endif
2346   return task;
2347 }
2348
2349 StgRegTable *
2350 resumeThread (void *task_)
2351 {
2352     StgTSO *tso;
2353     Capability *cap;
2354     Task *task = task_;
2355     int saved_errno;
2356 #if mingw32_HOST_OS
2357     StgWord32 saved_winerror;
2358 #endif
2359
2360     saved_errno = errno;
2361 #if mingw32_HOST_OS
2362     saved_winerror = GetLastError();
2363 #endif
2364
2365     cap = task->cap;
2366     // Wait for permission to re-enter the RTS with the result.
2367     waitForReturnCapability(&cap,task);
2368     // we might be on a different capability now... but if so, our
2369     // entry on the suspended_ccalling_tasks list will also have been
2370     // migrated.
2371
2372     // Remove the thread from the suspended list
2373     recoverSuspendedTask(cap,task);
2374
2375     tso = task->suspended_tso;
2376     task->suspended_tso = NULL;
2377     tso->link = END_TSO_QUEUE;
2378     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2379     
2380     if (tso->why_blocked == BlockedOnCCall) {
2381         awakenBlockedExceptionQueue(cap,tso);
2382         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2383     }
2384     
2385     /* Reset blocking status */
2386     tso->why_blocked  = NotBlocked;
2387     
2388     cap->r.rCurrentTSO = tso;
2389     cap->in_haskell = rtsTrue;
2390     errno = saved_errno;
2391 #if mingw32_HOST_OS
2392     SetLastError(saved_winerror);
2393 #endif
2394
2395     /* We might have GC'd, mark the TSO dirty again */
2396     dirtyTSO(tso);
2397
2398     IF_DEBUG(sanity, checkTSO(tso));
2399
2400     return &cap->r;
2401 }
2402
2403 /* ---------------------------------------------------------------------------
2404  * scheduleThread()
2405  *
2406  * scheduleThread puts a thread on the end  of the runnable queue.
2407  * This will usually be done immediately after a thread is created.
2408  * The caller of scheduleThread must create the thread using e.g.
2409  * createThread and push an appropriate closure
2410  * on this thread's stack before the scheduler is invoked.
2411  * ------------------------------------------------------------------------ */
2412
2413 void
2414 scheduleThread(Capability *cap, StgTSO *tso)
2415 {
2416     // The thread goes at the *end* of the run-queue, to avoid possible
2417     // starvation of any threads already on the queue.
2418     appendToRunQueue(cap,tso);
2419 }
2420
2421 void
2422 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2423 {
2424 #if defined(THREADED_RTS)
2425     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2426                               // move this thread from now on.
2427     cpu %= RtsFlags.ParFlags.nNodes;
2428     if (cpu == cap->no) {
2429         appendToRunQueue(cap,tso);
2430     } else {
2431         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2432     }
2433 #else
2434     appendToRunQueue(cap,tso);
2435 #endif
2436 }
2437
2438 Capability *
2439 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2440 {
2441     Task *task;
2442
2443     // We already created/initialised the Task
2444     task = cap->running_task;
2445
2446     // This TSO is now a bound thread; make the Task and TSO
2447     // point to each other.
2448     tso->bound = task;
2449     tso->cap = cap;
2450
2451     task->tso = tso;
2452     task->ret = ret;
2453     task->stat = NoStatus;
2454
2455     appendToRunQueue(cap,tso);
2456
2457     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2458
2459 #if defined(GRAN)
2460     /* GranSim specific init */
2461     CurrentTSO = m->tso;                // the TSO to run
2462     procStatus[MainProc] = Busy;        // status of main PE
2463     CurrentProc = MainProc;             // PE to run it on
2464 #endif
2465
2466     cap = schedule(cap,task);
2467
2468     ASSERT(task->stat != NoStatus);
2469     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2470
2471     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2472     return cap;
2473 }
2474
2475 /* ----------------------------------------------------------------------------
2476  * Starting Tasks
2477  * ------------------------------------------------------------------------- */
2478
2479 #if defined(THREADED_RTS)
2480 void
2481 workerStart(Task *task)
2482 {
2483     Capability *cap;
2484
2485     // See startWorkerTask().
2486     ACQUIRE_LOCK(&task->lock);
2487     cap = task->cap;
2488     RELEASE_LOCK(&task->lock);
2489
2490     // set the thread-local pointer to the Task:
2491     taskEnter(task);
2492
2493     // schedule() runs without a lock.
2494     cap = schedule(cap,task);
2495
2496     // On exit from schedule(), we have a Capability.
2497     releaseCapability(cap);
2498     workerTaskStop(task);
2499 }
2500 #endif
2501
2502 /* ---------------------------------------------------------------------------
2503  * initScheduler()
2504  *
2505  * Initialise the scheduler.  This resets all the queues - if the
2506  * queues contained any threads, they'll be garbage collected at the
2507  * next pass.
2508  *
2509  * ------------------------------------------------------------------------ */
2510
2511 void 
2512 initScheduler(void)
2513 {
2514 #if defined(GRAN)
2515   nat i;
2516   for (i=0; i<=MAX_PROC; i++) {
2517     run_queue_hds[i]      = END_TSO_QUEUE;
2518     run_queue_tls[i]      = END_TSO_QUEUE;
2519     blocked_queue_hds[i]  = END_TSO_QUEUE;
2520     blocked_queue_tls[i]  = END_TSO_QUEUE;
2521     ccalling_threadss[i]  = END_TSO_QUEUE;
2522     blackhole_queue[i]    = END_TSO_QUEUE;
2523     sleeping_queue        = END_TSO_QUEUE;
2524   }
2525 #elif !defined(THREADED_RTS)
2526   blocked_queue_hd  = END_TSO_QUEUE;
2527   blocked_queue_tl  = END_TSO_QUEUE;
2528   sleeping_queue    = END_TSO_QUEUE;
2529 #endif
2530
2531   blackhole_queue   = END_TSO_QUEUE;
2532   all_threads       = END_TSO_QUEUE;
2533
2534   context_switch = 0;
2535   sched_state    = SCHED_RUNNING;
2536
2537 #if defined(THREADED_RTS)
2538   /* Initialise the mutex and condition variables used by
2539    * the scheduler. */
2540   initMutex(&sched_mutex);
2541 #endif
2542   
2543   ACQUIRE_LOCK(&sched_mutex);
2544
2545   /* A capability holds the state a native thread needs in
2546    * order to execute STG code. At least one capability is
2547    * floating around (only THREADED_RTS builds have more than one).
2548    */
2549   initCapabilities();
2550
2551   initTaskManager();
2552
2553 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2554   initSparkPools();
2555 #endif
2556
2557 #if defined(THREADED_RTS)
2558   /*
2559    * Eagerly start one worker to run each Capability, except for
2560    * Capability 0.  The idea is that we're probably going to start a
2561    * bound thread on Capability 0 pretty soon, so we don't want a
2562    * worker task hogging it.
2563    */
2564   { 
2565       nat i;
2566       Capability *cap;
2567       for (i = 1; i < n_capabilities; i++) {
2568           cap = &capabilities[i];
2569           ACQUIRE_LOCK(&cap->lock);
2570           startWorkerTask(cap, workerStart);
2571           RELEASE_LOCK(&cap->lock);
2572       }
2573   }
2574 #endif
2575
2576   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2577
2578   RELEASE_LOCK(&sched_mutex);
2579 }
2580
2581 void
2582 exitScheduler( void )
2583 {
2584     Task *task = NULL;
2585
2586 #if defined(THREADED_RTS)
2587     ACQUIRE_LOCK(&sched_mutex);
2588     task = newBoundTask();
2589     RELEASE_LOCK(&sched_mutex);
2590 #endif
2591
2592     // If we haven't killed all the threads yet, do it now.
2593     if (sched_state < SCHED_SHUTTING_DOWN) {
2594         sched_state = SCHED_INTERRUPTING;
2595         scheduleDoGC(NULL,task,rtsFalse);    
2596     }
2597     sched_state = SCHED_SHUTTING_DOWN;
2598
2599 #if defined(THREADED_RTS)
2600     { 
2601         nat i;
2602         
2603         for (i = 0; i < n_capabilities; i++) {
2604             shutdownCapability(&capabilities[i], task);
2605         }
2606         boundTaskExiting(task);
2607         stopTaskManager();
2608     }
2609 #else
2610     freeCapability(&MainCapability);
2611 #endif
2612 }
2613
2614 void
2615 freeScheduler( void )
2616 {
2617     freeTaskManager();
2618     if (n_capabilities != 1) {
2619         stgFree(capabilities);
2620     }
2621 #if defined(THREADED_RTS)
2622     closeMutex(&sched_mutex);
2623 #endif
2624 }
2625
2626 /* ---------------------------------------------------------------------------
2627    Where are the roots that we know about?
2628
2629         - all the threads on the runnable queue
2630         - all the threads on the blocked queue
2631         - all the threads on the sleeping queue
2632         - all the thread currently executing a _ccall_GC
2633         - all the "main threads"
2634      
2635    ------------------------------------------------------------------------ */
2636
2637 /* This has to be protected either by the scheduler monitor, or by the
2638         garbage collection monitor (probably the latter).
2639         KH @ 25/10/99
2640 */
2641
2642 void
2643 GetRoots( evac_fn evac )
2644 {
2645     nat i;
2646     Capability *cap;
2647     Task *task;
2648
2649 #if defined(GRAN)
2650     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2651         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2652             evac((StgClosure **)&run_queue_hds[i]);
2653         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2654             evac((StgClosure **)&run_queue_tls[i]);
2655         
2656         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2657             evac((StgClosure **)&blocked_queue_hds[i]);
2658         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2659             evac((StgClosure **)&blocked_queue_tls[i]);
2660         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2661             evac((StgClosure **)&ccalling_threads[i]);
2662     }
2663
2664     markEventQueue();
2665
2666 #else /* !GRAN */
2667
2668     for (i = 0; i < n_capabilities; i++) {
2669         cap = &capabilities[i];
2670         evac((StgClosure **)(void *)&cap->run_queue_hd);
2671         evac((StgClosure **)(void *)&cap->run_queue_tl);
2672 #if defined(THREADED_RTS)
2673         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2674         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2675 #endif
2676         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2677              task=task->next) {
2678             debugTrace(DEBUG_sched,
2679                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2680             evac((StgClosure **)(void *)&task->suspended_tso);
2681         }
2682
2683     }
2684     
2685
2686 #if !defined(THREADED_RTS)
2687     evac((StgClosure **)(void *)&blocked_queue_hd);
2688     evac((StgClosure **)(void *)&blocked_queue_tl);
2689     evac((StgClosure **)(void *)&sleeping_queue);
2690 #endif 
2691 #endif
2692
2693     // evac((StgClosure **)&blackhole_queue);
2694
2695 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2696     markSparkQueue(evac);
2697 #endif
2698     
2699 #if defined(RTS_USER_SIGNALS)
2700     // mark the signal handlers (signals should be already blocked)
2701     if (RtsFlags.MiscFlags.install_signal_handlers) {
2702         markSignalHandlers(evac);
2703     }
2704 #endif
2705 }
2706
2707 /* -----------------------------------------------------------------------------
2708    performGC
2709
2710    This is the interface to the garbage collector from Haskell land.
2711    We provide this so that external C code can allocate and garbage
2712    collect when called from Haskell via _ccall_GC.
2713    -------------------------------------------------------------------------- */
2714
2715 static void
2716 performGC_(rtsBool force_major)
2717 {
2718     Task *task;
2719     // We must grab a new Task here, because the existing Task may be
2720     // associated with a particular Capability, and chained onto the 
2721     // suspended_ccalling_tasks queue.
2722     ACQUIRE_LOCK(&sched_mutex);
2723     task = newBoundTask();
2724     RELEASE_LOCK(&sched_mutex);
2725     scheduleDoGC(NULL,task,force_major);
2726     boundTaskExiting(task);
2727 }
2728
2729 void
2730 performGC(void)
2731 {
2732     performGC_(rtsFalse);
2733 }
2734
2735 void
2736 performMajorGC(void)
2737 {
2738     performGC_(rtsTrue);
2739 }
2740
2741 /* -----------------------------------------------------------------------------
2742    Stack overflow
2743
2744    If the thread has reached its maximum stack size, then raise the
2745    StackOverflow exception in the offending thread.  Otherwise
2746    relocate the TSO into a larger chunk of memory and adjust its stack
2747    size appropriately.
2748    -------------------------------------------------------------------------- */
2749
2750 static StgTSO *
2751 threadStackOverflow(Capability *cap, StgTSO *tso)
2752 {
2753   nat new_stack_size, stack_words;
2754   lnat new_tso_size;
2755   StgPtr new_sp;
2756   StgTSO *dest;
2757
2758   IF_DEBUG(sanity,checkTSO(tso));
2759
2760   // don't allow throwTo() to modify the blocked_exceptions queue
2761   // while we are moving the TSO:
2762   lockClosure((StgClosure *)tso);
2763
2764   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2765       // NB. never raise a StackOverflow exception if the thread is
2766       // inside Control.Exceptino.block.  It is impractical to protect
2767       // against stack overflow exceptions, since virtually anything
2768       // can raise one (even 'catch'), so this is the only sensible
2769       // thing to do here.  See bug #767.
2770
2771       debugTrace(DEBUG_gc,
2772                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2773                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2774       IF_DEBUG(gc,
2775                /* If we're debugging, just print out the top of the stack */
2776                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2777                                                 tso->sp+64)));
2778
2779       // Send this thread the StackOverflow exception
2780       unlockTSO(tso);
2781       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2782       return tso;
2783   }
2784
2785   /* Try to double the current stack size.  If that takes us over the
2786    * maximum stack size for this thread, then use the maximum instead.
2787    * Finally round up so the TSO ends up as a whole number of blocks.
2788    */
2789   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2790   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2791                                        TSO_STRUCT_SIZE)/sizeof(W_);
2792   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2793   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2794
2795   debugTrace(DEBUG_sched, 
2796              "increasing stack size from %ld words to %d.",
2797              (long)tso->stack_size, new_stack_size);
2798
2799   dest = (StgTSO *)allocate(new_tso_size);
2800   TICK_ALLOC_TSO(new_stack_size,0);
2801
2802   /* copy the TSO block and the old stack into the new area */
2803   memcpy(dest,tso,TSO_STRUCT_SIZE);
2804   stack_words = tso->stack + tso->stack_size - tso->sp;
2805   new_sp = (P_)dest + new_tso_size - stack_words;
2806   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2807
2808   /* relocate the stack pointers... */
2809   dest->sp         = new_sp;
2810   dest->stack_size = new_stack_size;
2811         
2812   /* Mark the old TSO as relocated.  We have to check for relocated
2813    * TSOs in the garbage collector and any primops that deal with TSOs.
2814    *
2815    * It's important to set the sp value to just beyond the end
2816    * of the stack, so we don't attempt to scavenge any part of the
2817    * dead TSO's stack.
2818    */
2819   tso->what_next = ThreadRelocated;
2820   tso->link = dest;
2821   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2822   tso->why_blocked = NotBlocked;
2823
2824   IF_PAR_DEBUG(verbose,
2825                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2826                      tso->id, tso, tso->stack_size);
2827                /* If we're debugging, just print out the top of the stack */
2828                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2829                                                 tso->sp+64)));
2830   
2831   unlockTSO(dest);
2832   unlockTSO(tso);
2833
2834   IF_DEBUG(sanity,checkTSO(dest));
2835 #if 0
2836   IF_DEBUG(scheduler,printTSO(dest));
2837 #endif
2838
2839   return dest;
2840 }
2841
2842 /* ---------------------------------------------------------------------------
2843    Interrupt execution
2844    - usually called inside a signal handler so it mustn't do anything fancy.   
2845    ------------------------------------------------------------------------ */
2846
2847 void
2848 interruptStgRts(void)
2849 {
2850     sched_state = SCHED_INTERRUPTING;
2851     context_switch = 1;
2852     wakeUpRts();
2853 }
2854
2855 /* -----------------------------------------------------------------------------
2856    Wake up the RTS
2857    
2858    This function causes at least one OS thread to wake up and run the
2859    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2860    an external event has arrived that may need servicing (eg. a
2861    keyboard interrupt).
2862
2863    In the single-threaded RTS we don't do anything here; we only have
2864    one thread anyway, and the event that caused us to want to wake up
2865    will have interrupted any blocking system call in progress anyway.
2866    -------------------------------------------------------------------------- */
2867
2868 void
2869 wakeUpRts(void)
2870 {
2871 #if defined(THREADED_RTS)
2872     // This forces the IO Manager thread to wakeup, which will
2873     // in turn ensure that some OS thread wakes up and runs the
2874     // scheduler loop, which will cause a GC and deadlock check.
2875     ioManagerWakeup();
2876 #endif
2877 }
2878
2879 /* -----------------------------------------------------------------------------
2880  * checkBlackHoles()
2881  *
2882  * Check the blackhole_queue for threads that can be woken up.  We do
2883  * this periodically: before every GC, and whenever the run queue is
2884  * empty.
2885  *
2886  * An elegant solution might be to just wake up all the blocked
2887  * threads with awakenBlockedQueue occasionally: they'll go back to
2888  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2889  * doesn't give us a way to tell whether we've actually managed to
2890  * wake up any threads, so we would be busy-waiting.
2891  *
2892  * -------------------------------------------------------------------------- */
2893
2894 static rtsBool
2895 checkBlackHoles (Capability *cap)
2896 {
2897     StgTSO **prev, *t;
2898     rtsBool any_woke_up = rtsFalse;
2899     StgHalfWord type;
2900
2901     // blackhole_queue is global:
2902     ASSERT_LOCK_HELD(&sched_mutex);
2903
2904     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2905
2906     // ASSUMES: sched_mutex
2907     prev = &blackhole_queue;
2908     t = blackhole_queue;
2909     while (t != END_TSO_QUEUE) {
2910         ASSERT(t->why_blocked == BlockedOnBlackHole);
2911         type = get_itbl(t->block_info.closure)->type;
2912         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2913             IF_DEBUG(sanity,checkTSO(t));
2914             t = unblockOne(cap, t);
2915             // urk, the threads migrate to the current capability
2916             // here, but we'd like to keep them on the original one.
2917             *prev = t;
2918             any_woke_up = rtsTrue;
2919         } else {
2920             prev = &t->link;
2921             t = t->link;
2922         }
2923     }
2924
2925     return any_woke_up;
2926 }
2927
2928 /* -----------------------------------------------------------------------------
2929    Deleting threads
2930
2931    This is used for interruption (^C) and forking, and corresponds to
2932    raising an exception but without letting the thread catch the
2933    exception.
2934    -------------------------------------------------------------------------- */
2935
2936 static void 
2937 deleteThread (Capability *cap, StgTSO *tso)
2938 {
2939     // NOTE: must only be called on a TSO that we have exclusive
2940     // access to, because we will call throwToSingleThreaded() below.
2941     // The TSO must be on the run queue of the Capability we own, or 
2942     // we must own all Capabilities.
2943
2944     if (tso->why_blocked != BlockedOnCCall &&
2945         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2946         throwToSingleThreaded(cap,tso,NULL);
2947     }
2948 }
2949
2950 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2951 static void 
2952 deleteThread_(Capability *cap, StgTSO *tso)
2953 { // for forkProcess only:
2954   // like deleteThread(), but we delete threads in foreign calls, too.
2955
2956     if (tso->why_blocked == BlockedOnCCall ||
2957         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2958         unblockOne(cap,tso);
2959         tso->what_next = ThreadKilled;
2960     } else {
2961         deleteThread(cap,tso);
2962     }
2963 }
2964 #endif
2965
2966 /* -----------------------------------------------------------------------------
2967    raiseExceptionHelper
2968    
2969    This function is called by the raise# primitve, just so that we can
2970    move some of the tricky bits of raising an exception from C-- into
2971    C.  Who knows, it might be a useful re-useable thing here too.
2972    -------------------------------------------------------------------------- */
2973
2974 StgWord
2975 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2976 {
2977     Capability *cap = regTableToCapability(reg);
2978     StgThunk *raise_closure = NULL;
2979     StgPtr p, next;
2980     StgRetInfoTable *info;
2981     //
2982     // This closure represents the expression 'raise# E' where E
2983     // is the exception raise.  It is used to overwrite all the
2984     // thunks which are currently under evaluataion.
2985     //
2986
2987     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2988     // LDV profiling: stg_raise_info has THUNK as its closure
2989     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2990     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2991     // 1 does not cause any problem unless profiling is performed.
2992     // However, when LDV profiling goes on, we need to linearly scan
2993     // small object pool, where raise_closure is stored, so we should
2994     // use MIN_UPD_SIZE.
2995     //
2996     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2997     //                                 sizeofW(StgClosure)+1);
2998     //
2999
3000     //
3001     // Walk up the stack, looking for the catch frame.  On the way,
3002     // we update any closures pointed to from update frames with the
3003     // raise closure that we just built.
3004     //
3005     p = tso->sp;
3006     while(1) {
3007         info = get_ret_itbl((StgClosure *)p);
3008         next = p + stack_frame_sizeW((StgClosure *)p);
3009         switch (info->i.type) {
3010             
3011         case UPDATE_FRAME:
3012             // Only create raise_closure if we need to.
3013             if (raise_closure == NULL) {
3014                 raise_closure = 
3015                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3016                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3017                 raise_closure->payload[0] = exception;
3018             }
3019             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3020             p = next;
3021             continue;
3022
3023         case ATOMICALLY_FRAME:
3024             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3025             tso->sp = p;
3026             return ATOMICALLY_FRAME;
3027             
3028         case CATCH_FRAME:
3029             tso->sp = p;
3030             return CATCH_FRAME;
3031
3032         case CATCH_STM_FRAME:
3033             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3034             tso->sp = p;
3035             return CATCH_STM_FRAME;
3036             
3037         case STOP_FRAME:
3038             tso->sp = p;
3039             return STOP_FRAME;
3040
3041         case CATCH_RETRY_FRAME:
3042         default:
3043             p = next; 
3044             continue;
3045         }
3046     }
3047 }
3048
3049
3050 /* -----------------------------------------------------------------------------
3051    findRetryFrameHelper
3052
3053    This function is called by the retry# primitive.  It traverses the stack
3054    leaving tso->sp referring to the frame which should handle the retry.  
3055
3056    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3057    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3058
3059    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3060    create) because retries are not considered to be exceptions, despite the
3061    similar implementation.
3062
3063    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3064    not be created within memory transactions.
3065    -------------------------------------------------------------------------- */
3066
3067 StgWord
3068 findRetryFrameHelper (StgTSO *tso)
3069 {
3070   StgPtr           p, next;
3071   StgRetInfoTable *info;
3072
3073   p = tso -> sp;
3074   while (1) {
3075     info = get_ret_itbl((StgClosure *)p);
3076     next = p + stack_frame_sizeW((StgClosure *)p);
3077     switch (info->i.type) {
3078       
3079     case ATOMICALLY_FRAME:
3080         debugTrace(DEBUG_stm,
3081                    "found ATOMICALLY_FRAME at %p during retry", p);
3082         tso->sp = p;
3083         return ATOMICALLY_FRAME;
3084       
3085     case CATCH_RETRY_FRAME:
3086         debugTrace(DEBUG_stm,
3087                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3088         tso->sp = p;
3089         return CATCH_RETRY_FRAME;
3090       
3091     case CATCH_STM_FRAME: {
3092         debugTrace(DEBUG_stm,
3093                    "found CATCH_STM_FRAME at %p during retry", p);
3094         StgTRecHeader *trec = tso -> trec;
3095         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3096         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3097         stmAbortTransaction(tso -> cap, trec);
3098         stmFreeAbortedTRec(tso -> cap, trec);
3099         tso -> trec = outer;
3100         p = next; 
3101         continue;
3102     }
3103       
3104
3105     default:
3106       ASSERT(info->i.type != CATCH_FRAME);
3107       ASSERT(info->i.type != STOP_FRAME);
3108       p = next; 
3109       continue;
3110     }
3111   }
3112 }
3113
3114 /* -----------------------------------------------------------------------------
3115    resurrectThreads is called after garbage collection on the list of
3116    threads found to be garbage.  Each of these threads will be woken
3117    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3118    on an MVar, or NonTermination if the thread was blocked on a Black
3119    Hole.
3120
3121    Locks: assumes we hold *all* the capabilities.
3122    -------------------------------------------------------------------------- */
3123
3124 void
3125 resurrectThreads (StgTSO *threads)
3126 {
3127     StgTSO *tso, *next;
3128     Capability *cap;
3129
3130     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3131         next = tso->global_link;
3132         tso->global_link = all_threads;
3133         all_threads = tso;
3134         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3135         
3136         // Wake up the thread on the Capability it was last on
3137         cap = tso->cap;
3138         
3139         switch (tso->why_blocked) {
3140         case BlockedOnMVar:
3141         case BlockedOnException:
3142             /* Called by GC - sched_mutex lock is currently held. */
3143             throwToSingleThreaded(cap, tso,
3144                                   (StgClosure *)BlockedOnDeadMVar_closure);
3145             break;
3146         case BlockedOnBlackHole:
3147             throwToSingleThreaded(cap, tso,
3148                                   (StgClosure *)NonTermination_closure);
3149             break;
3150         case BlockedOnSTM:
3151             throwToSingleThreaded(cap, tso,
3152                                   (StgClosure *)BlockedIndefinitely_closure);
3153             break;
3154         case NotBlocked:
3155             /* This might happen if the thread was blocked on a black hole
3156              * belonging to a thread that we've just woken up (raiseAsync
3157              * can wake up threads, remember...).
3158              */
3159             continue;
3160         default:
3161             barf("resurrectThreads: thread blocked in a strange way");
3162         }
3163     }
3164 }