Cleanup Hpc sub-system, remove hpc-tracer implementation.
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #include "Proftimer.h"
32 #include "ProfHeap.h"
33 #if defined(GRAN) || defined(PARALLEL_HASKELL)
34 # include "GranSimRts.h"
35 # include "GranSim.h"
36 # include "ParallelRts.h"
37 # include "Parallel.h"
38 # include "ParallelDebug.h"
39 # include "FetchMe.h"
40 # include "HLC.h"
41 #endif
42 #include "Sparks.h"
43 #include "Capability.h"
44 #include "Task.h"
45 #include "AwaitEvent.h"
46 #if defined(mingw32_HOST_OS)
47 #include "win32/IOManager.h"
48 #endif
49 #include "Trace.h"
50 #include "RaiseAsync.h"
51 #include "Threads.h"
52 #include "ThrIOManager.h"
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 #if defined(GRAN)
143 StgTSO *CurrentTSO;
144 #endif
145
146 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
147  *  exists - earlier gccs apparently didn't.
148  *  -= chak
149  */
150 StgTSO dummy_tso;
151
152 /*
153  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
154  * in an MT setting, needed to signal that a worker thread shouldn't hang around
155  * in the scheduler when it is out of work.
156  */
157 rtsBool shutting_down_scheduler = rtsFalse;
158
159 /*
160  * This mutex protects most of the global scheduler data in
161  * the THREADED_RTS runtime.
162  */
163 #if defined(THREADED_RTS)
164 Mutex sched_mutex;
165 #endif
166
167 #if defined(PARALLEL_HASKELL)
168 StgTSO *LastTSO;
169 rtsTime TimeOfLastYield;
170 rtsBool emitSchedule = rtsTrue;
171 #endif
172
173 #if !defined(mingw32_HOST_OS)
174 #define FORKPROCESS_PRIMOP_SUPPORTED
175 #endif
176
177 /* -----------------------------------------------------------------------------
178  * static function prototypes
179  * -------------------------------------------------------------------------- */
180
181 static Capability *schedule (Capability *initialCapability, Task *task);
182
183 //
184 // These function all encapsulate parts of the scheduler loop, and are
185 // abstracted only to make the structure and control flow of the
186 // scheduler clearer.
187 //
188 static void schedulePreLoop (void);
189 #if defined(THREADED_RTS)
190 static void schedulePushWork(Capability *cap, Task *task);
191 #endif
192 static void scheduleStartSignalHandlers (Capability *cap);
193 static void scheduleCheckBlockedThreads (Capability *cap);
194 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
195 static void scheduleCheckBlackHoles (Capability *cap);
196 static void scheduleDetectDeadlock (Capability *cap, Task *task);
197 #if defined(GRAN)
198 static StgTSO *scheduleProcessEvent(rtsEvent *event);
199 #endif
200 #if defined(PARALLEL_HASKELL)
201 static StgTSO *scheduleSendPendingMessages(void);
202 static void scheduleActivateSpark(void);
203 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
204 #endif
205 #if defined(PAR) || defined(GRAN)
206 static void scheduleGranParReport(void);
207 #endif
208 static void schedulePostRunThread(void);
209 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
210 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
211                                          StgTSO *t);
212 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
213                                     nat prev_what_next );
214 static void scheduleHandleThreadBlocked( StgTSO *t );
215 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
216                                              StgTSO *t );
217 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
218 static Capability *scheduleDoGC(Capability *cap, Task *task,
219                                 rtsBool force_major);
220
221 static rtsBool checkBlackHoles(Capability *cap);
222
223 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
224
225 static void deleteThread (Capability *cap, StgTSO *tso);
226 static void deleteAllThreads (Capability *cap);
227
228 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
229 static void deleteThread_(Capability *cap, StgTSO *tso);
230 #endif
231
232 #if defined(PARALLEL_HASKELL)
233 StgTSO * createSparkThread(rtsSpark spark);
234 StgTSO * activateSpark (rtsSpark spark);  
235 #endif
236
237 #ifdef DEBUG
238 static char *whatNext_strs[] = {
239   "(unknown)",
240   "ThreadRunGHC",
241   "ThreadInterpret",
242   "ThreadKilled",
243   "ThreadRelocated",
244   "ThreadComplete"
245 };
246 #endif
247
248 /* -----------------------------------------------------------------------------
249  * Putting a thread on the run queue: different scheduling policies
250  * -------------------------------------------------------------------------- */
251
252 STATIC_INLINE void
253 addToRunQueue( Capability *cap, StgTSO *t )
254 {
255 #if defined(PARALLEL_HASKELL)
256     if (RtsFlags.ParFlags.doFairScheduling) { 
257         // this does round-robin scheduling; good for concurrency
258         appendToRunQueue(cap,t);
259     } else {
260         // this does unfair scheduling; good for parallelism
261         pushOnRunQueue(cap,t);
262     }
263 #else
264     // this does round-robin scheduling; good for concurrency
265     appendToRunQueue(cap,t);
266 #endif
267 }
268
269 /* ---------------------------------------------------------------------------
270    Main scheduling loop.
271
272    We use round-robin scheduling, each thread returning to the
273    scheduler loop when one of these conditions is detected:
274
275       * out of heap space
276       * timer expires (thread yields)
277       * thread blocks
278       * thread ends
279       * stack overflow
280
281    GRAN version:
282      In a GranSim setup this loop iterates over the global event queue.
283      This revolves around the global event queue, which determines what 
284      to do next. Therefore, it's more complicated than either the 
285      concurrent or the parallel (GUM) setup.
286
287    GUM version:
288      GUM iterates over incoming messages.
289      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
290      and sends out a fish whenever it has nothing to do; in-between
291      doing the actual reductions (shared code below) it processes the
292      incoming messages and deals with delayed operations 
293      (see PendingFetches).
294      This is not the ugliest code you could imagine, but it's bloody close.
295
296    ------------------------------------------------------------------------ */
297
298 static Capability *
299 schedule (Capability *initialCapability, Task *task)
300 {
301   StgTSO *t;
302   Capability *cap;
303   StgThreadReturnCode ret;
304 #if defined(GRAN)
305   rtsEvent *event;
306 #elif defined(PARALLEL_HASKELL)
307   StgTSO *tso;
308   GlobalTaskId pe;
309   rtsBool receivedFinish = rtsFalse;
310 # if defined(DEBUG)
311   nat tp_size, sp_size; // stats only
312 # endif
313 #endif
314   nat prev_what_next;
315   rtsBool ready_to_gc;
316 #if defined(THREADED_RTS)
317   rtsBool first = rtsTrue;
318 #endif
319   
320   cap = initialCapability;
321
322   // Pre-condition: this task owns initialCapability.
323   // The sched_mutex is *NOT* held
324   // NB. on return, we still hold a capability.
325
326   debugTrace (DEBUG_sched, 
327               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
328               task, initialCapability);
329
330   schedulePreLoop();
331
332   // -----------------------------------------------------------
333   // Scheduler loop starts here:
334
335 #if defined(PARALLEL_HASKELL)
336 #define TERMINATION_CONDITION        (!receivedFinish)
337 #elif defined(GRAN)
338 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
339 #else
340 #define TERMINATION_CONDITION        rtsTrue
341 #endif
342
343   while (TERMINATION_CONDITION) {
344
345 #if defined(GRAN)
346       /* Choose the processor with the next event */
347       CurrentProc = event->proc;
348       CurrentTSO = event->tso;
349 #endif
350
351 #if defined(THREADED_RTS)
352       if (first) {
353           // don't yield the first time, we want a chance to run this
354           // thread for a bit, even if there are others banging at the
355           // door.
356           first = rtsFalse;
357           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
358       } else {
359           // Yield the capability to higher-priority tasks if necessary.
360           yieldCapability(&cap, task);
361       }
362 #endif
363       
364 #if defined(THREADED_RTS)
365       schedulePushWork(cap,task);
366 #endif
367
368     // Check whether we have re-entered the RTS from Haskell without
369     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
370     // call).
371     if (cap->in_haskell) {
372           errorBelch("schedule: re-entered unsafely.\n"
373                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
374           stg_exit(EXIT_FAILURE);
375     }
376
377     // The interruption / shutdown sequence.
378     // 
379     // In order to cleanly shut down the runtime, we want to:
380     //   * make sure that all main threads return to their callers
381     //     with the state 'Interrupted'.
382     //   * clean up all OS threads assocated with the runtime
383     //   * free all memory etc.
384     //
385     // So the sequence for ^C goes like this:
386     //
387     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
388     //     arranges for some Capability to wake up
389     //
390     //   * all threads in the system are halted, and the zombies are
391     //     placed on the run queue for cleaning up.  We acquire all
392     //     the capabilities in order to delete the threads, this is
393     //     done by scheduleDoGC() for convenience (because GC already
394     //     needs to acquire all the capabilities).  We can't kill
395     //     threads involved in foreign calls.
396     // 
397     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
398     //
399     //   * sched_state := SCHED_SHUTTING_DOWN
400     //
401     //   * all workers exit when the run queue on their capability
402     //     drains.  All main threads will also exit when their TSO
403     //     reaches the head of the run queue and they can return.
404     //
405     //   * eventually all Capabilities will shut down, and the RTS can
406     //     exit.
407     //
408     //   * We might be left with threads blocked in foreign calls, 
409     //     we should really attempt to kill these somehow (TODO);
410     
411     switch (sched_state) {
412     case SCHED_RUNNING:
413         break;
414     case SCHED_INTERRUPTING:
415         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
416 #if defined(THREADED_RTS)
417         discardSparksCap(cap);
418 #endif
419         /* scheduleDoGC() deletes all the threads */
420         cap = scheduleDoGC(cap,task,rtsFalse);
421         break;
422     case SCHED_SHUTTING_DOWN:
423         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
424         // If we are a worker, just exit.  If we're a bound thread
425         // then we will exit below when we've removed our TSO from
426         // the run queue.
427         if (task->tso == NULL && emptyRunQueue(cap)) {
428             return cap;
429         }
430         break;
431     default:
432         barf("sched_state: %d", sched_state);
433     }
434
435 #if defined(THREADED_RTS)
436     // If the run queue is empty, take a spark and turn it into a thread.
437     {
438         if (emptyRunQueue(cap)) {
439             StgClosure *spark;
440             spark = findSpark(cap);
441             if (spark != NULL) {
442                 debugTrace(DEBUG_sched,
443                            "turning spark of closure %p into a thread",
444                            (StgClosure *)spark);
445                 createSparkThread(cap,spark);     
446             }
447         }
448     }
449 #endif // THREADED_RTS
450
451     scheduleStartSignalHandlers(cap);
452
453     // Only check the black holes here if we've nothing else to do.
454     // During normal execution, the black hole list only gets checked
455     // at GC time, to avoid repeatedly traversing this possibly long
456     // list each time around the scheduler.
457     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
458
459     scheduleCheckWakeupThreads(cap);
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464 #if defined(THREADED_RTS)
465     cap = task->cap;    // reload cap, it might have changed
466 #endif
467
468     // Normally, the only way we can get here with no threads to
469     // run is if a keyboard interrupt received during 
470     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
471     // Additionally, it is not fatal for the
472     // threaded RTS to reach here with no threads to run.
473     //
474     // win32: might be here due to awaitEvent() being abandoned
475     // as a result of a console event having been delivered.
476     if ( emptyRunQueue(cap) ) {
477 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
478         ASSERT(sched_state >= SCHED_INTERRUPTING);
479 #endif
480         continue; // nothing to do
481     }
482
483 #if defined(PARALLEL_HASKELL)
484     scheduleSendPendingMessages();
485     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
486         continue;
487
488 #if defined(SPARKS)
489     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
490 #endif
491
492     /* If we still have no work we need to send a FISH to get a spark
493        from another PE */
494     if (emptyRunQueue(cap)) {
495         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
496         ASSERT(rtsFalse); // should not happen at the moment
497     }
498     // from here: non-empty run queue.
499     //  TODO: merge above case with this, only one call processMessages() !
500     if (PacketsWaiting()) {  /* process incoming messages, if
501                                 any pending...  only in else
502                                 because getRemoteWork waits for
503                                 messages as well */
504         receivedFinish = processMessages();
505     }
506 #endif
507
508 #if defined(GRAN)
509     scheduleProcessEvent(event);
510 #endif
511
512     // 
513     // Get a thread to run
514     //
515     t = popRunQueue(cap);
516
517 #if defined(GRAN) || defined(PAR)
518     scheduleGranParReport(); // some kind of debuging output
519 #else
520     // Sanity check the thread we're about to run.  This can be
521     // expensive if there is lots of thread switching going on...
522     IF_DEBUG(sanity,checkTSO(t));
523 #endif
524
525 #if defined(THREADED_RTS)
526     // Check whether we can run this thread in the current task.
527     // If not, we have to pass our capability to the right task.
528     {
529         Task *bound = t->bound;
530       
531         if (bound) {
532             if (bound == task) {
533                 debugTrace(DEBUG_sched,
534                            "### Running thread %lu in bound thread", (unsigned long)t->id);
535                 // yes, the Haskell thread is bound to the current native thread
536             } else {
537                 debugTrace(DEBUG_sched,
538                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
539                 // no, bound to a different Haskell thread: pass to that thread
540                 pushOnRunQueue(cap,t);
541                 continue;
542             }
543         } else {
544             // The thread we want to run is unbound.
545             if (task->tso) { 
546                 debugTrace(DEBUG_sched,
547                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
548                 // no, the current native thread is bound to a different
549                 // Haskell thread, so pass it to any worker thread
550                 pushOnRunQueue(cap,t);
551                 continue; 
552             }
553         }
554     }
555 #endif
556
557     cap->r.rCurrentTSO = t;
558     
559     /* context switches are initiated by the timer signal, unless
560      * the user specified "context switch as often as possible", with
561      * +RTS -C0
562      */
563     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
564         && !emptyThreadQueues(cap)) {
565         context_switch = 1;
566     }
567          
568 run_thread:
569
570     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
571                               (long)t->id, whatNext_strs[t->what_next]);
572
573     startHeapProfTimer();
574
575     // Check for exceptions blocked on this thread
576     maybePerformBlockedException (cap, t);
577
578     // ----------------------------------------------------------------------
579     // Run the current thread 
580
581     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
582     ASSERT(t->cap == cap);
583
584     prev_what_next = t->what_next;
585
586     errno = t->saved_errno;
587 #if mingw32_HOST_OS
588     SetLastError(t->saved_winerror);
589 #endif
590
591     cap->in_haskell = rtsTrue;
592
593     dirtyTSO(t);
594
595     recent_activity = ACTIVITY_YES;
596
597     switch (prev_what_next) {
598         
599     case ThreadKilled:
600     case ThreadComplete:
601         /* Thread already finished, return to scheduler. */
602         ret = ThreadFinished;
603         break;
604         
605     case ThreadRunGHC:
606     {
607         StgRegTable *r;
608         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609         cap = regTableToCapability(r);
610         ret = r->rRet;
611         break;
612     }
613     
614     case ThreadInterpret:
615         cap = interpretBCO(cap);
616         ret = cap->r.rRet;
617         break;
618         
619     default:
620         barf("schedule: invalid what_next field");
621     }
622
623     cap->in_haskell = rtsFalse;
624
625     // The TSO might have moved, eg. if it re-entered the RTS and a GC
626     // happened.  So find the new location:
627     t = cap->r.rCurrentTSO;
628
629     // We have run some Haskell code: there might be blackhole-blocked
630     // threads to wake up now.
631     // Lock-free test here should be ok, we're just setting a flag.
632     if ( blackhole_queue != END_TSO_QUEUE ) {
633         blackholes_need_checking = rtsTrue;
634     }
635     
636     // And save the current errno in this thread.
637     // XXX: possibly bogus for SMP because this thread might already
638     // be running again, see code below.
639     t->saved_errno = errno;
640 #if mingw32_HOST_OS
641     // Similarly for Windows error code
642     t->saved_winerror = GetLastError();
643 #endif
644
645 #if defined(THREADED_RTS)
646     // If ret is ThreadBlocked, and this Task is bound to the TSO that
647     // blocked, we are in limbo - the TSO is now owned by whatever it
648     // is blocked on, and may in fact already have been woken up,
649     // perhaps even on a different Capability.  It may be the case
650     // that task->cap != cap.  We better yield this Capability
651     // immediately and return to normaility.
652     if (ret == ThreadBlocked) {
653         debugTrace(DEBUG_sched,
654                    "--<< thread %lu (%s) stopped: blocked",
655                    (unsigned long)t->id, whatNext_strs[t->what_next]);
656         continue;
657     }
658 #endif
659
660     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
661     ASSERT(t->cap == cap);
662
663     // ----------------------------------------------------------------------
664     
665     // Costs for the scheduler are assigned to CCS_SYSTEM
666     stopHeapProfTimer();
667 #if defined(PROFILING)
668     CCCS = CCS_SYSTEM;
669 #endif
670     
671     schedulePostRunThread();
672
673     ready_to_gc = rtsFalse;
674
675     switch (ret) {
676     case HeapOverflow:
677         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
678         break;
679
680     case StackOverflow:
681         scheduleHandleStackOverflow(cap,task,t);
682         break;
683
684     case ThreadYielding:
685         if (scheduleHandleYield(cap, t, prev_what_next)) {
686             // shortcut for switching between compiler/interpreter:
687             goto run_thread; 
688         }
689         break;
690
691     case ThreadBlocked:
692         scheduleHandleThreadBlocked(t);
693         break;
694
695     case ThreadFinished:
696         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
697         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
698         break;
699
700     default:
701       barf("schedule: invalid thread return code %d", (int)ret);
702     }
703
704     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
705       cap = scheduleDoGC(cap,task,rtsFalse);
706     }
707   } /* end of while() */
708
709   debugTrace(PAR_DEBUG_verbose,
710              "== Leaving schedule() after having received Finish");
711 }
712
713 /* ----------------------------------------------------------------------------
714  * Setting up the scheduler loop
715  * ------------------------------------------------------------------------- */
716
717 static void
718 schedulePreLoop(void)
719 {
720 #if defined(GRAN) 
721     /* set up first event to get things going */
722     /* ToDo: assign costs for system setup and init MainTSO ! */
723     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
724               ContinueThread, 
725               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
726     
727     debugTrace (DEBUG_gran,
728                 "GRAN: Init CurrentTSO (in schedule) = %p", 
729                 CurrentTSO);
730     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
731     
732     if (RtsFlags.GranFlags.Light) {
733         /* Save current time; GranSim Light only */
734         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
735     }      
736 #endif
737 }
738
739 /* -----------------------------------------------------------------------------
740  * schedulePushWork()
741  *
742  * Push work to other Capabilities if we have some.
743  * -------------------------------------------------------------------------- */
744
745 #if defined(THREADED_RTS)
746 static void
747 schedulePushWork(Capability *cap USED_IF_THREADS, 
748                  Task *task      USED_IF_THREADS)
749 {
750     Capability *free_caps[n_capabilities], *cap0;
751     nat i, n_free_caps;
752
753     // migration can be turned off with +RTS -qg
754     if (!RtsFlags.ParFlags.migrate) return;
755
756     // Check whether we have more threads on our run queue, or sparks
757     // in our pool, that we could hand to another Capability.
758     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
759         && sparkPoolSizeCap(cap) < 2) {
760         return;
761     }
762
763     // First grab as many free Capabilities as we can.
764     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
765         cap0 = &capabilities[i];
766         if (cap != cap0 && tryGrabCapability(cap0,task)) {
767             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
768                 // it already has some work, we just grabbed it at 
769                 // the wrong moment.  Or maybe it's deadlocked!
770                 releaseCapability(cap0);
771             } else {
772                 free_caps[n_free_caps++] = cap0;
773             }
774         }
775     }
776
777     // we now have n_free_caps free capabilities stashed in
778     // free_caps[].  Share our run queue equally with them.  This is
779     // probably the simplest thing we could do; improvements we might
780     // want to do include:
781     //
782     //   - giving high priority to moving relatively new threads, on 
783     //     the gournds that they haven't had time to build up a
784     //     working set in the cache on this CPU/Capability.
785     //
786     //   - giving low priority to moving long-lived threads
787
788     if (n_free_caps > 0) {
789         StgTSO *prev, *t, *next;
790         rtsBool pushed_to_all;
791
792         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
793
794         i = 0;
795         pushed_to_all = rtsFalse;
796
797         if (cap->run_queue_hd != END_TSO_QUEUE) {
798             prev = cap->run_queue_hd;
799             t = prev->link;
800             prev->link = END_TSO_QUEUE;
801             for (; t != END_TSO_QUEUE; t = next) {
802                 next = t->link;
803                 t->link = END_TSO_QUEUE;
804                 if (t->what_next == ThreadRelocated
805                     || t->bound == task // don't move my bound thread
806                     || tsoLocked(t)) {  // don't move a locked thread
807                     prev->link = t;
808                     prev = t;
809                 } else if (i == n_free_caps) {
810                     pushed_to_all = rtsTrue;
811                     i = 0;
812                     // keep one for us
813                     prev->link = t;
814                     prev = t;
815                 } else {
816                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
817                     appendToRunQueue(free_caps[i],t);
818                     if (t->bound) { t->bound->cap = free_caps[i]; }
819                     t->cap = free_caps[i];
820                     i++;
821                 }
822             }
823             cap->run_queue_tl = prev;
824         }
825
826         // If there are some free capabilities that we didn't push any
827         // threads to, then try to push a spark to each one.
828         if (!pushed_to_all) {
829             StgClosure *spark;
830             // i is the next free capability to push to
831             for (; i < n_free_caps; i++) {
832                 if (emptySparkPoolCap(free_caps[i])) {
833                     spark = findSpark(cap);
834                     if (spark != NULL) {
835                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
836                         newSpark(&(free_caps[i]->r), spark);
837                     }
838                 }
839             }
840         }
841
842         // release the capabilities
843         for (i = 0; i < n_free_caps; i++) {
844             task->cap = free_caps[i];
845             releaseCapability(free_caps[i]);
846         }
847     }
848     task->cap = cap; // reset to point to our Capability.
849 }
850 #endif
851
852 /* ----------------------------------------------------------------------------
853  * Start any pending signal handlers
854  * ------------------------------------------------------------------------- */
855
856 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
857 static void
858 scheduleStartSignalHandlers(Capability *cap)
859 {
860     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
861         // safe outside the lock
862         startSignalHandlers(cap);
863     }
864 }
865 #else
866 static void
867 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
868 {
869 }
870 #endif
871
872 /* ----------------------------------------------------------------------------
873  * Check for blocked threads that can be woken up.
874  * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
878 {
879 #if !defined(THREADED_RTS)
880     //
881     // Check whether any waiting threads need to be woken up.  If the
882     // run queue is empty, and there are no other tasks running, we
883     // can wait indefinitely for something to happen.
884     //
885     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
886     {
887         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
888     }
889 #endif
890 }
891
892
893 /* ----------------------------------------------------------------------------
894  * Check for threads woken up by other Capabilities
895  * ------------------------------------------------------------------------- */
896
897 static void
898 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
899 {
900 #if defined(THREADED_RTS)
901     // Any threads that were woken up by other Capabilities get
902     // appended to our run queue.
903     if (!emptyWakeupQueue(cap)) {
904         ACQUIRE_LOCK(&cap->lock);
905         if (emptyRunQueue(cap)) {
906             cap->run_queue_hd = cap->wakeup_queue_hd;
907             cap->run_queue_tl = cap->wakeup_queue_tl;
908         } else {
909             cap->run_queue_tl->link = cap->wakeup_queue_hd;
910             cap->run_queue_tl = cap->wakeup_queue_tl;
911         }
912         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
913         RELEASE_LOCK(&cap->lock);
914     }
915 #endif
916 }
917
918 /* ----------------------------------------------------------------------------
919  * Check for threads blocked on BLACKHOLEs that can be woken up
920  * ------------------------------------------------------------------------- */
921 static void
922 scheduleCheckBlackHoles (Capability *cap)
923 {
924     if ( blackholes_need_checking ) // check without the lock first
925     {
926         ACQUIRE_LOCK(&sched_mutex);
927         if ( blackholes_need_checking ) {
928             checkBlackHoles(cap);
929             blackholes_need_checking = rtsFalse;
930         }
931         RELEASE_LOCK(&sched_mutex);
932     }
933 }
934
935 /* ----------------------------------------------------------------------------
936  * Detect deadlock conditions and attempt to resolve them.
937  * ------------------------------------------------------------------------- */
938
939 static void
940 scheduleDetectDeadlock (Capability *cap, Task *task)
941 {
942
943 #if defined(PARALLEL_HASKELL)
944     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
945     return;
946 #endif
947
948     /* 
949      * Detect deadlock: when we have no threads to run, there are no
950      * threads blocked, waiting for I/O, or sleeping, and all the
951      * other tasks are waiting for work, we must have a deadlock of
952      * some description.
953      */
954     if ( emptyThreadQueues(cap) )
955     {
956 #if defined(THREADED_RTS)
957         /* 
958          * In the threaded RTS, we only check for deadlock if there
959          * has been no activity in a complete timeslice.  This means
960          * we won't eagerly start a full GC just because we don't have
961          * any threads to run currently.
962          */
963         if (recent_activity != ACTIVITY_INACTIVE) return;
964 #endif
965
966         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
967
968         // Garbage collection can release some new threads due to
969         // either (a) finalizers or (b) threads resurrected because
970         // they are unreachable and will therefore be sent an
971         // exception.  Any threads thus released will be immediately
972         // runnable.
973         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
974
975         recent_activity = ACTIVITY_DONE_GC;
976         
977         if ( !emptyRunQueue(cap) ) return;
978
979 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
980         /* If we have user-installed signal handlers, then wait
981          * for signals to arrive rather then bombing out with a
982          * deadlock.
983          */
984         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
985             debugTrace(DEBUG_sched,
986                        "still deadlocked, waiting for signals...");
987
988             awaitUserSignals();
989
990             if (signals_pending()) {
991                 startSignalHandlers(cap);
992             }
993
994             // either we have threads to run, or we were interrupted:
995             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
996         }
997 #endif
998
999 #if !defined(THREADED_RTS)
1000         /* Probably a real deadlock.  Send the current main thread the
1001          * Deadlock exception.
1002          */
1003         if (task->tso) {
1004             switch (task->tso->why_blocked) {
1005             case BlockedOnSTM:
1006             case BlockedOnBlackHole:
1007             case BlockedOnException:
1008             case BlockedOnMVar:
1009                 throwToSingleThreaded(cap, task->tso, 
1010                                       (StgClosure *)NonTermination_closure);
1011                 return;
1012             default:
1013                 barf("deadlock: main thread blocked in a strange way");
1014             }
1015         }
1016         return;
1017 #endif
1018     }
1019 }
1020
1021 /* ----------------------------------------------------------------------------
1022  * Process an event (GRAN only)
1023  * ------------------------------------------------------------------------- */
1024
1025 #if defined(GRAN)
1026 static StgTSO *
1027 scheduleProcessEvent(rtsEvent *event)
1028 {
1029     StgTSO *t;
1030
1031     if (RtsFlags.GranFlags.Light)
1032       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1033
1034     /* adjust time based on time-stamp */
1035     if (event->time > CurrentTime[CurrentProc] &&
1036         event->evttype != ContinueThread)
1037       CurrentTime[CurrentProc] = event->time;
1038     
1039     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1040     if (!RtsFlags.GranFlags.Light)
1041       handleIdlePEs();
1042
1043     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1044
1045     /* main event dispatcher in GranSim */
1046     switch (event->evttype) {
1047       /* Should just be continuing execution */
1048     case ContinueThread:
1049       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1050       /* ToDo: check assertion
1051       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1052              run_queue_hd != END_TSO_QUEUE);
1053       */
1054       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1055       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1056           procStatus[CurrentProc]==Fetching) {
1057         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1058               CurrentTSO->id, CurrentTSO, CurrentProc);
1059         goto next_thread;
1060       } 
1061       /* Ignore ContinueThreads for completed threads */
1062       if (CurrentTSO->what_next == ThreadComplete) {
1063         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1064               CurrentTSO->id, CurrentTSO, CurrentProc);
1065         goto next_thread;
1066       } 
1067       /* Ignore ContinueThreads for threads that are being migrated */
1068       if (PROCS(CurrentTSO)==Nowhere) { 
1069         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1070               CurrentTSO->id, CurrentTSO, CurrentProc);
1071         goto next_thread;
1072       }
1073       /* The thread should be at the beginning of the run queue */
1074       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1075         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1076               CurrentTSO->id, CurrentTSO, CurrentProc);
1077         break; // run the thread anyway
1078       }
1079       /*
1080       new_event(proc, proc, CurrentTime[proc],
1081                 FindWork,
1082                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1083       goto next_thread; 
1084       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1085       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1086
1087     case FetchNode:
1088       do_the_fetchnode(event);
1089       goto next_thread;             /* handle next event in event queue  */
1090       
1091     case GlobalBlock:
1092       do_the_globalblock(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case FetchReply:
1096       do_the_fetchreply(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case UnblockThread:   /* Move from the blocked queue to the tail of */
1100       do_the_unblock(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case ResumeThread:  /* Move from the blocked queue to the tail of */
1104       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1105       event->tso->gran.blocktime += 
1106         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1107       do_the_startthread(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case StartThread:
1111       do_the_startthread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case MoveThread:
1115       do_the_movethread(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case MoveSpark:
1119       do_the_movespark(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case FindWork:
1123       do_the_findwork(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     default:
1127       barf("Illegal event type %u\n", event->evttype);
1128     }  /* switch */
1129     
1130     /* This point was scheduler_loop in the old RTS */
1131
1132     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1133
1134     TimeOfLastEvent = CurrentTime[CurrentProc];
1135     TimeOfNextEvent = get_time_of_next_event();
1136     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1137     // CurrentTSO = ThreadQueueHd;
1138
1139     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1140                          TimeOfNextEvent));
1141
1142     if (RtsFlags.GranFlags.Light) 
1143       GranSimLight_leave_system(event, &ActiveTSO); 
1144
1145     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1146
1147     IF_DEBUG(gran, 
1148              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1149
1150     /* in a GranSim setup the TSO stays on the run queue */
1151     t = CurrentTSO;
1152     /* Take a thread from the run queue. */
1153     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1154
1155     IF_DEBUG(gran, 
1156              debugBelch("GRAN: About to run current thread, which is\n");
1157              G_TSO(t,5));
1158
1159     context_switch = 0; // turned on via GranYield, checking events and time slice
1160
1161     IF_DEBUG(gran, 
1162              DumpGranEvent(GR_SCHEDULE, t));
1163
1164     procStatus[CurrentProc] = Busy;
1165 }
1166 #endif // GRAN
1167
1168 /* ----------------------------------------------------------------------------
1169  * Send pending messages (PARALLEL_HASKELL only)
1170  * ------------------------------------------------------------------------- */
1171
1172 #if defined(PARALLEL_HASKELL)
1173 static StgTSO *
1174 scheduleSendPendingMessages(void)
1175 {
1176     StgSparkPool *pool;
1177     rtsSpark spark;
1178     StgTSO *t;
1179
1180 # if defined(PAR) // global Mem.Mgmt., omit for now
1181     if (PendingFetches != END_BF_QUEUE) {
1182         processFetches();
1183     }
1184 # endif
1185     
1186     if (RtsFlags.ParFlags.BufferTime) {
1187         // if we use message buffering, we must send away all message
1188         // packets which have become too old...
1189         sendOldBuffers(); 
1190     }
1191 }
1192 #endif
1193
1194 /* ----------------------------------------------------------------------------
1195  * Activate spark threads (PARALLEL_HASKELL only)
1196  * ------------------------------------------------------------------------- */
1197
1198 #if defined(PARALLEL_HASKELL)
1199 static void
1200 scheduleActivateSpark(void)
1201 {
1202 #if defined(SPARKS)
1203   ASSERT(emptyRunQueue());
1204 /* We get here if the run queue is empty and want some work.
1205    We try to turn a spark into a thread, and add it to the run queue,
1206    from where it will be picked up in the next iteration of the scheduler
1207    loop.
1208 */
1209
1210       /* :-[  no local threads => look out for local sparks */
1211       /* the spark pool for the current PE */
1212       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1213       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1214           pool->hd < pool->tl) {
1215         /* 
1216          * ToDo: add GC code check that we really have enough heap afterwards!!
1217          * Old comment:
1218          * If we're here (no runnable threads) and we have pending
1219          * sparks, we must have a space problem.  Get enough space
1220          * to turn one of those pending sparks into a
1221          * thread... 
1222          */
1223
1224         spark = findSpark(rtsFalse);            /* get a spark */
1225         if (spark != (rtsSpark) NULL) {
1226           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1227           IF_PAR_DEBUG(fish, // schedule,
1228                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1229                              tso->id, tso, advisory_thread_count));
1230
1231           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1232             IF_PAR_DEBUG(fish, // schedule,
1233                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1234                             spark));
1235             return rtsFalse; /* failed to generate a thread */
1236           }                  /* otherwise fall through & pick-up new tso */
1237         } else {
1238           IF_PAR_DEBUG(fish, // schedule,
1239                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1240                              spark_queue_len(pool)));
1241           return rtsFalse;  /* failed to generate a thread */
1242         }
1243         return rtsTrue;  /* success in generating a thread */
1244   } else { /* no more threads permitted or pool empty */
1245     return rtsFalse;  /* failed to generateThread */
1246   }
1247 #else
1248   tso = NULL; // avoid compiler warning only
1249   return rtsFalse;  /* dummy in non-PAR setup */
1250 #endif // SPARKS
1251 }
1252 #endif // PARALLEL_HASKELL
1253
1254 /* ----------------------------------------------------------------------------
1255  * Get work from a remote node (PARALLEL_HASKELL only)
1256  * ------------------------------------------------------------------------- */
1257     
1258 #if defined(PARALLEL_HASKELL)
1259 static rtsBool
1260 scheduleGetRemoteWork(rtsBool *receivedFinish)
1261 {
1262   ASSERT(emptyRunQueue());
1263
1264   if (RtsFlags.ParFlags.BufferTime) {
1265         IF_PAR_DEBUG(verbose, 
1266                 debugBelch("...send all pending data,"));
1267         {
1268           nat i;
1269           for (i=1; i<=nPEs; i++)
1270             sendImmediately(i); // send all messages away immediately
1271         }
1272   }
1273 # ifndef SPARKS
1274         //++EDEN++ idle() , i.e. send all buffers, wait for work
1275         // suppress fishing in EDEN... just look for incoming messages
1276         // (blocking receive)
1277   IF_PAR_DEBUG(verbose, 
1278                debugBelch("...wait for incoming messages...\n"));
1279   *receivedFinish = processMessages(); // blocking receive...
1280
1281         // and reenter scheduling loop after having received something
1282         // (return rtsFalse below)
1283
1284 # else /* activate SPARKS machinery */
1285 /* We get here, if we have no work, tried to activate a local spark, but still
1286    have no work. We try to get a remote spark, by sending a FISH message.
1287    Thread migration should be added here, and triggered when a sequence of 
1288    fishes returns without work. */
1289         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1290
1291       /* =8-[  no local sparks => look for work on other PEs */
1292         /*
1293          * We really have absolutely no work.  Send out a fish
1294          * (there may be some out there already), and wait for
1295          * something to arrive.  We clearly can't run any threads
1296          * until a SCHEDULE or RESUME arrives, and so that's what
1297          * we're hoping to see.  (Of course, we still have to
1298          * respond to other types of messages.)
1299          */
1300         rtsTime now = msTime() /*CURRENT_TIME*/;
1301         IF_PAR_DEBUG(verbose, 
1302                      debugBelch("--  now=%ld\n", now));
1303         IF_PAR_DEBUG(fish, // verbose,
1304              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1305                  (last_fish_arrived_at!=0 &&
1306                   last_fish_arrived_at+delay > now)) {
1307                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1308                      now, last_fish_arrived_at+delay, 
1309                      last_fish_arrived_at,
1310                      delay);
1311              });
1312   
1313         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1315           if (last_fish_arrived_at==0 ||
1316               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1317             /* outstandingFishes is set in sendFish, processFish;
1318                avoid flooding system with fishes via delay */
1319     next_fish_to_send_at = 0;  
1320   } else {
1321     /* ToDo: this should be done in the main scheduling loop to avoid the
1322              busy wait here; not so bad if fish delay is very small  */
1323     int iq = 0; // DEBUGGING -- HWL
1324     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1325     /* send a fish when ready, but process messages that arrive in the meantime */
1326     do {
1327       if (PacketsWaiting()) {
1328         iq++; // DEBUGGING
1329         *receivedFinish = processMessages();
1330       }
1331       now = msTime();
1332     } while (!*receivedFinish || now<next_fish_to_send_at);
1333     // JB: This means the fish could become obsolete, if we receive
1334     // work. Better check for work again? 
1335     // last line: while (!receivedFinish || !haveWork || now<...)
1336     // next line: if (receivedFinish || haveWork )
1337
1338     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1339       return rtsFalse;  // NB: this will leave scheduler loop
1340                         // immediately after return!
1341                           
1342     IF_PAR_DEBUG(fish, // verbose,
1343                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1344
1345   }
1346
1347     // JB: IMHO, this should all be hidden inside sendFish(...)
1348     /* pe = choosePE(); 
1349        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1350                 NEW_FISH_HUNGER);
1351
1352     // Global statistics: count no. of fishes
1353     if (RtsFlags.ParFlags.ParStats.Global &&
1354          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1355            globalParStats.tot_fish_mess++;
1356            }
1357     */ 
1358
1359   /* delayed fishes must have been sent by now! */
1360   next_fish_to_send_at = 0;  
1361   }
1362       
1363   *receivedFinish = processMessages();
1364 # endif /* SPARKS */
1365
1366  return rtsFalse;
1367  /* NB: this function always returns rtsFalse, meaning the scheduler
1368     loop continues with the next iteration; 
1369     rationale: 
1370       return code means success in finding work; we enter this function
1371       if there is no local work, thus have to send a fish which takes
1372       time until it arrives with work; in the meantime we should process
1373       messages in the main loop;
1374  */
1375 }
1376 #endif // PARALLEL_HASKELL
1377
1378 /* ----------------------------------------------------------------------------
1379  * PAR/GRAN: Report stats & debugging info(?)
1380  * ------------------------------------------------------------------------- */
1381
1382 #if defined(PAR) || defined(GRAN)
1383 static void
1384 scheduleGranParReport(void)
1385 {
1386   ASSERT(run_queue_hd != END_TSO_QUEUE);
1387
1388   /* Take a thread from the run queue, if we have work */
1389   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1390
1391     /* If this TSO has got its outport closed in the meantime, 
1392      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1393      * It has to be marked as TH_DEAD for this purpose.
1394      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1395
1396 JB: TODO: investigate wether state change field could be nuked
1397      entirely and replaced by the normal tso state (whatnext
1398      field). All we want to do is to kill tsos from outside.
1399      */
1400
1401     /* ToDo: write something to the log-file
1402     if (RTSflags.ParFlags.granSimStats && !sameThread)
1403         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1404
1405     CurrentTSO = t;
1406     */
1407     /* the spark pool for the current PE */
1408     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1409
1410     IF_DEBUG(scheduler, 
1411              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1412                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413
1414     IF_PAR_DEBUG(fish,
1415              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1416                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1417
1418     if (RtsFlags.ParFlags.ParStats.Full && 
1419         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1420         (emitSchedule || // forced emit
1421          (t && LastTSO && t->id != LastTSO->id))) {
1422       /* 
1423          we are running a different TSO, so write a schedule event to log file
1424          NB: If we use fair scheduling we also have to write  a deschedule 
1425              event for LastTSO; with unfair scheduling we know that the
1426              previous tso has blocked whenever we switch to another tso, so
1427              we don't need it in GUM for now
1428       */
1429       IF_PAR_DEBUG(fish, // schedule,
1430                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1431
1432       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1433                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1434       emitSchedule = rtsFalse;
1435     }
1436 }     
1437 #endif
1438
1439 /* ----------------------------------------------------------------------------
1440  * After running a thread...
1441  * ------------------------------------------------------------------------- */
1442
1443 static void
1444 schedulePostRunThread(void)
1445 {
1446 #if defined(PAR)
1447     /* HACK 675: if the last thread didn't yield, make sure to print a 
1448        SCHEDULE event to the log file when StgRunning the next thread, even
1449        if it is the same one as before */
1450     LastTSO = t; 
1451     TimeOfLastYield = CURRENT_TIME;
1452 #endif
1453
1454   /* some statistics gathering in the parallel case */
1455
1456 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1457   switch (ret) {
1458     case HeapOverflow:
1459 # if defined(GRAN)
1460       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1461       globalGranStats.tot_heapover++;
1462 # elif defined(PAR)
1463       globalParStats.tot_heapover++;
1464 # endif
1465       break;
1466
1467      case StackOverflow:
1468 # if defined(GRAN)
1469       IF_DEBUG(gran, 
1470                DumpGranEvent(GR_DESCHEDULE, t));
1471       globalGranStats.tot_stackover++;
1472 # elif defined(PAR)
1473       // IF_DEBUG(par, 
1474       // DumpGranEvent(GR_DESCHEDULE, t);
1475       globalParStats.tot_stackover++;
1476 # endif
1477       break;
1478
1479     case ThreadYielding:
1480 # if defined(GRAN)
1481       IF_DEBUG(gran, 
1482                DumpGranEvent(GR_DESCHEDULE, t));
1483       globalGranStats.tot_yields++;
1484 # elif defined(PAR)
1485       // IF_DEBUG(par, 
1486       // DumpGranEvent(GR_DESCHEDULE, t);
1487       globalParStats.tot_yields++;
1488 # endif
1489       break; 
1490
1491     case ThreadBlocked:
1492 # if defined(GRAN)
1493         debugTrace(DEBUG_sched, 
1494                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1495                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1496                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1497                if (t->block_info.closure!=(StgClosure*)NULL)
1498                  print_bq(t->block_info.closure);
1499                debugBelch("\n"));
1500
1501       // ??? needed; should emit block before
1502       IF_DEBUG(gran, 
1503                DumpGranEvent(GR_DESCHEDULE, t)); 
1504       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1505       /*
1506         ngoq Dogh!
1507       ASSERT(procStatus[CurrentProc]==Busy || 
1508               ((procStatus[CurrentProc]==Fetching) && 
1509               (t->block_info.closure!=(StgClosure*)NULL)));
1510       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1511           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1512             procStatus[CurrentProc]==Fetching)) 
1513         procStatus[CurrentProc] = Idle;
1514       */
1515 # elif defined(PAR)
1516 //++PAR++  blockThread() writes the event (change?)
1517 # endif
1518     break;
1519
1520   case ThreadFinished:
1521     break;
1522
1523   default:
1524     barf("parGlobalStats: unknown return code");
1525     break;
1526     }
1527 #endif
1528 }
1529
1530 /* -----------------------------------------------------------------------------
1531  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1532  * -------------------------------------------------------------------------- */
1533
1534 static rtsBool
1535 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1536 {
1537     // did the task ask for a large block?
1538     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1539         // if so, get one and push it on the front of the nursery.
1540         bdescr *bd;
1541         lnat blocks;
1542         
1543         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1544         
1545         debugTrace(DEBUG_sched,
1546                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1547                    (long)t->id, whatNext_strs[t->what_next], blocks);
1548     
1549         // don't do this if the nursery is (nearly) full, we'll GC first.
1550         if (cap->r.rCurrentNursery->link != NULL ||
1551             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1552                                                // if the nursery has only one block.
1553             
1554             ACQUIRE_SM_LOCK
1555             bd = allocGroup( blocks );
1556             RELEASE_SM_LOCK
1557             cap->r.rNursery->n_blocks += blocks;
1558             
1559             // link the new group into the list
1560             bd->link = cap->r.rCurrentNursery;
1561             bd->u.back = cap->r.rCurrentNursery->u.back;
1562             if (cap->r.rCurrentNursery->u.back != NULL) {
1563                 cap->r.rCurrentNursery->u.back->link = bd;
1564             } else {
1565 #if !defined(THREADED_RTS)
1566                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1567                        g0s0 == cap->r.rNursery);
1568 #endif
1569                 cap->r.rNursery->blocks = bd;
1570             }             
1571             cap->r.rCurrentNursery->u.back = bd;
1572             
1573             // initialise it as a nursery block.  We initialise the
1574             // step, gen_no, and flags field of *every* sub-block in
1575             // this large block, because this is easier than making
1576             // sure that we always find the block head of a large
1577             // block whenever we call Bdescr() (eg. evacuate() and
1578             // isAlive() in the GC would both have to do this, at
1579             // least).
1580             { 
1581                 bdescr *x;
1582                 for (x = bd; x < bd + blocks; x++) {
1583                     x->step = cap->r.rNursery;
1584                     x->gen_no = 0;
1585                     x->flags = 0;
1586                 }
1587             }
1588             
1589             // This assert can be a killer if the app is doing lots
1590             // of large block allocations.
1591             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1592             
1593             // now update the nursery to point to the new block
1594             cap->r.rCurrentNursery = bd;
1595             
1596             // we might be unlucky and have another thread get on the
1597             // run queue before us and steal the large block, but in that
1598             // case the thread will just end up requesting another large
1599             // block.
1600             pushOnRunQueue(cap,t);
1601             return rtsFalse;  /* not actually GC'ing */
1602         }
1603     }
1604     
1605     debugTrace(DEBUG_sched,
1606                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1607                (long)t->id, whatNext_strs[t->what_next]);
1608
1609 #if defined(GRAN)
1610     ASSERT(!is_on_queue(t,CurrentProc));
1611 #elif defined(PARALLEL_HASKELL)
1612     /* Currently we emit a DESCHEDULE event before GC in GUM.
1613        ToDo: either add separate event to distinguish SYSTEM time from rest
1614        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1615     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1616         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1617                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1618         emitSchedule = rtsTrue;
1619     }
1620 #endif
1621       
1622     pushOnRunQueue(cap,t);
1623     return rtsTrue;
1624     /* actual GC is done at the end of the while loop in schedule() */
1625 }
1626
1627 /* -----------------------------------------------------------------------------
1628  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1629  * -------------------------------------------------------------------------- */
1630
1631 static void
1632 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1633 {
1634     debugTrace (DEBUG_sched,
1635                 "--<< thread %ld (%s) stopped, StackOverflow", 
1636                 (long)t->id, whatNext_strs[t->what_next]);
1637
1638     /* just adjust the stack for this thread, then pop it back
1639      * on the run queue.
1640      */
1641     { 
1642         /* enlarge the stack */
1643         StgTSO *new_t = threadStackOverflow(cap, t);
1644         
1645         /* The TSO attached to this Task may have moved, so update the
1646          * pointer to it.
1647          */
1648         if (task->tso == t) {
1649             task->tso = new_t;
1650         }
1651         pushOnRunQueue(cap,new_t);
1652     }
1653 }
1654
1655 /* -----------------------------------------------------------------------------
1656  * Handle a thread that returned to the scheduler with ThreadYielding
1657  * -------------------------------------------------------------------------- */
1658
1659 static rtsBool
1660 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1661 {
1662     // Reset the context switch flag.  We don't do this just before
1663     // running the thread, because that would mean we would lose ticks
1664     // during GC, which can lead to unfair scheduling (a thread hogs
1665     // the CPU because the tick always arrives during GC).  This way
1666     // penalises threads that do a lot of allocation, but that seems
1667     // better than the alternative.
1668     context_switch = 0;
1669     
1670     /* put the thread back on the run queue.  Then, if we're ready to
1671      * GC, check whether this is the last task to stop.  If so, wake
1672      * up the GC thread.  getThread will block during a GC until the
1673      * GC is finished.
1674      */
1675 #ifdef DEBUG
1676     if (t->what_next != prev_what_next) {
1677         debugTrace(DEBUG_sched,
1678                    "--<< thread %ld (%s) stopped to switch evaluators", 
1679                    (long)t->id, whatNext_strs[t->what_next]);
1680     } else {
1681         debugTrace(DEBUG_sched,
1682                    "--<< thread %ld (%s) stopped, yielding",
1683                    (long)t->id, whatNext_strs[t->what_next]);
1684     }
1685 #endif
1686     
1687     IF_DEBUG(sanity,
1688              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1689              checkTSO(t));
1690     ASSERT(t->link == END_TSO_QUEUE);
1691     
1692     // Shortcut if we're just switching evaluators: don't bother
1693     // doing stack squeezing (which can be expensive), just run the
1694     // thread.
1695     if (t->what_next != prev_what_next) {
1696         return rtsTrue;
1697     }
1698     
1699 #if defined(GRAN)
1700     ASSERT(!is_on_queue(t,CurrentProc));
1701       
1702     IF_DEBUG(sanity,
1703              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1704              checkThreadQsSanity(rtsTrue));
1705
1706 #endif
1707
1708     addToRunQueue(cap,t);
1709
1710 #if defined(GRAN)
1711     /* add a ContinueThread event to actually process the thread */
1712     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1713               ContinueThread,
1714               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1715     IF_GRAN_DEBUG(bq, 
1716                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1717                   G_EVENTQ(0);
1718                   G_CURR_THREADQ(0));
1719 #endif
1720     return rtsFalse;
1721 }
1722
1723 /* -----------------------------------------------------------------------------
1724  * Handle a thread that returned to the scheduler with ThreadBlocked
1725  * -------------------------------------------------------------------------- */
1726
1727 static void
1728 scheduleHandleThreadBlocked( StgTSO *t
1729 #if !defined(GRAN) && !defined(DEBUG)
1730     STG_UNUSED
1731 #endif
1732     )
1733 {
1734 #if defined(GRAN)
1735     IF_DEBUG(scheduler,
1736              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1737                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1738              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1739     
1740     // ??? needed; should emit block before
1741     IF_DEBUG(gran, 
1742              DumpGranEvent(GR_DESCHEDULE, t)); 
1743     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1744     /*
1745       ngoq Dogh!
1746       ASSERT(procStatus[CurrentProc]==Busy || 
1747       ((procStatus[CurrentProc]==Fetching) && 
1748       (t->block_info.closure!=(StgClosure*)NULL)));
1749       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1750       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1751       procStatus[CurrentProc]==Fetching)) 
1752       procStatus[CurrentProc] = Idle;
1753     */
1754 #elif defined(PAR)
1755     IF_DEBUG(scheduler,
1756              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1757                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1758     IF_PAR_DEBUG(bq,
1759                  
1760                  if (t->block_info.closure!=(StgClosure*)NULL) 
1761                  print_bq(t->block_info.closure));
1762     
1763     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1764     blockThread(t);
1765     
1766     /* whatever we schedule next, we must log that schedule */
1767     emitSchedule = rtsTrue;
1768     
1769 #else /* !GRAN */
1770
1771       // We don't need to do anything.  The thread is blocked, and it
1772       // has tidied up its stack and placed itself on whatever queue
1773       // it needs to be on.
1774
1775     // ASSERT(t->why_blocked != NotBlocked);
1776     // Not true: for example,
1777     //    - in THREADED_RTS, the thread may already have been woken
1778     //      up by another Capability.  This actually happens: try
1779     //      conc023 +RTS -N2.
1780     //    - the thread may have woken itself up already, because
1781     //      threadPaused() might have raised a blocked throwTo
1782     //      exception, see maybePerformBlockedException().
1783
1784 #ifdef DEBUG
1785     if (traceClass(DEBUG_sched)) {
1786         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1787                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1788         printThreadBlockage(t);
1789         debugTraceEnd();
1790     }
1791 #endif
1792     
1793     /* Only for dumping event to log file 
1794        ToDo: do I need this in GranSim, too?
1795        blockThread(t);
1796     */
1797 #endif
1798 }
1799
1800 /* -----------------------------------------------------------------------------
1801  * Handle a thread that returned to the scheduler with ThreadFinished
1802  * -------------------------------------------------------------------------- */
1803
1804 static rtsBool
1805 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1806 {
1807     /* Need to check whether this was a main thread, and if so,
1808      * return with the return value.
1809      *
1810      * We also end up here if the thread kills itself with an
1811      * uncaught exception, see Exception.cmm.
1812      */
1813     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1814                (unsigned long)t->id, whatNext_strs[t->what_next]);
1815
1816 #if defined(GRAN)
1817       endThread(t, CurrentProc); // clean-up the thread
1818 #elif defined(PARALLEL_HASKELL)
1819       /* For now all are advisory -- HWL */
1820       //if(t->priority==AdvisoryPriority) ??
1821       advisory_thread_count--; // JB: Caution with this counter, buggy!
1822       
1823 # if defined(DIST)
1824       if(t->dist.priority==RevalPriority)
1825         FinishReval(t);
1826 # endif
1827     
1828 # if defined(EDENOLD)
1829       // the thread could still have an outport... (BUG)
1830       if (t->eden.outport != -1) {
1831       // delete the outport for the tso which has finished...
1832         IF_PAR_DEBUG(eden_ports,
1833                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1834                               t->eden.outport, t->id));
1835         deleteOPT(t);
1836       }
1837       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1838       if (t->eden.epid != -1) {
1839         IF_PAR_DEBUG(eden_ports,
1840                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1841                            t->id, t->eden.epid));
1842         removeTSOfromProcess(t);
1843       }
1844 # endif 
1845
1846 # if defined(PAR)
1847       if (RtsFlags.ParFlags.ParStats.Full &&
1848           !RtsFlags.ParFlags.ParStats.Suppressed) 
1849         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1850
1851       //  t->par only contains statistics: left out for now...
1852       IF_PAR_DEBUG(fish,
1853                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1854                               t->id,t,t->par.sparkname));
1855 # endif
1856 #endif // PARALLEL_HASKELL
1857
1858       //
1859       // Check whether the thread that just completed was a bound
1860       // thread, and if so return with the result.  
1861       //
1862       // There is an assumption here that all thread completion goes
1863       // through this point; we need to make sure that if a thread
1864       // ends up in the ThreadKilled state, that it stays on the run
1865       // queue so it can be dealt with here.
1866       //
1867
1868       if (t->bound) {
1869
1870           if (t->bound != task) {
1871 #if !defined(THREADED_RTS)
1872               // Must be a bound thread that is not the topmost one.  Leave
1873               // it on the run queue until the stack has unwound to the
1874               // point where we can deal with this.  Leaving it on the run
1875               // queue also ensures that the garbage collector knows about
1876               // this thread and its return value (it gets dropped from the
1877               // all_threads list so there's no other way to find it).
1878               appendToRunQueue(cap,t);
1879               return rtsFalse;
1880 #else
1881               // this cannot happen in the threaded RTS, because a
1882               // bound thread can only be run by the appropriate Task.
1883               barf("finished bound thread that isn't mine");
1884 #endif
1885           }
1886
1887           ASSERT(task->tso == t);
1888
1889           if (t->what_next == ThreadComplete) {
1890               if (task->ret) {
1891                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1892                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1893               }
1894               task->stat = Success;
1895           } else {
1896               if (task->ret) {
1897                   *(task->ret) = NULL;
1898               }
1899               if (sched_state >= SCHED_INTERRUPTING) {
1900                   task->stat = Interrupted;
1901               } else {
1902                   task->stat = Killed;
1903               }
1904           }
1905 #ifdef DEBUG
1906           removeThreadLabel((StgWord)task->tso->id);
1907 #endif
1908           return rtsTrue; // tells schedule() to return
1909       }
1910
1911       return rtsFalse;
1912 }
1913
1914 /* -----------------------------------------------------------------------------
1915  * Perform a heap census
1916  * -------------------------------------------------------------------------- */
1917
1918 static rtsBool
1919 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1920 {
1921     // When we have +RTS -i0 and we're heap profiling, do a census at
1922     // every GC.  This lets us get repeatable runs for debugging.
1923     if (performHeapProfile ||
1924         (RtsFlags.ProfFlags.profileInterval==0 &&
1925          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1926         return rtsTrue;
1927     } else {
1928         return rtsFalse;
1929     }
1930 }
1931
1932 /* -----------------------------------------------------------------------------
1933  * Perform a garbage collection if necessary
1934  * -------------------------------------------------------------------------- */
1935
1936 static Capability *
1937 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1938 {
1939     StgTSO *t;
1940     rtsBool heap_census;
1941 #ifdef THREADED_RTS
1942     static volatile StgWord waiting_for_gc;
1943     rtsBool was_waiting;
1944     nat i;
1945 #endif
1946
1947 #ifdef THREADED_RTS
1948     // In order to GC, there must be no threads running Haskell code.
1949     // Therefore, the GC thread needs to hold *all* the capabilities,
1950     // and release them after the GC has completed.  
1951     //
1952     // This seems to be the simplest way: previous attempts involved
1953     // making all the threads with capabilities give up their
1954     // capabilities and sleep except for the *last* one, which
1955     // actually did the GC.  But it's quite hard to arrange for all
1956     // the other tasks to sleep and stay asleep.
1957     //
1958         
1959     was_waiting = cas(&waiting_for_gc, 0, 1);
1960     if (was_waiting) {
1961         do {
1962             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1963             if (cap) yieldCapability(&cap,task);
1964         } while (waiting_for_gc);
1965         return cap;  // NOTE: task->cap might have changed here
1966     }
1967
1968     for (i=0; i < n_capabilities; i++) {
1969         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1970         if (cap != &capabilities[i]) {
1971             Capability *pcap = &capabilities[i];
1972             // we better hope this task doesn't get migrated to
1973             // another Capability while we're waiting for this one.
1974             // It won't, because load balancing happens while we have
1975             // all the Capabilities, but even so it's a slightly
1976             // unsavoury invariant.
1977             task->cap = pcap;
1978             context_switch = 1;
1979             waitForReturnCapability(&pcap, task);
1980             if (pcap != &capabilities[i]) {
1981                 barf("scheduleDoGC: got the wrong capability");
1982             }
1983         }
1984     }
1985
1986     waiting_for_gc = rtsFalse;
1987 #endif
1988
1989     /* Kick any transactions which are invalid back to their
1990      * atomically frames.  When next scheduled they will try to
1991      * commit, this commit will fail and they will retry.
1992      */
1993     { 
1994         StgTSO *next;
1995
1996         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1997             if (t->what_next == ThreadRelocated) {
1998                 next = t->link;
1999             } else {
2000                 next = t->global_link;
2001                 
2002                 // This is a good place to check for blocked
2003                 // exceptions.  It might be the case that a thread is
2004                 // blocked on delivering an exception to a thread that
2005                 // is also blocked - we try to ensure that this
2006                 // doesn't happen in throwTo(), but it's too hard (or
2007                 // impossible) to close all the race holes, so we
2008                 // accept that some might get through and deal with
2009                 // them here.  A GC will always happen at some point,
2010                 // even if the system is otherwise deadlocked.
2011                 maybePerformBlockedException (&capabilities[0], t);
2012
2013                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2014                     if (!stmValidateNestOfTransactions (t -> trec)) {
2015                         debugTrace(DEBUG_sched | DEBUG_stm,
2016                                    "trec %p found wasting its time", t);
2017                         
2018                         // strip the stack back to the
2019                         // ATOMICALLY_FRAME, aborting the (nested)
2020                         // transaction, and saving the stack of any
2021                         // partially-evaluated thunks on the heap.
2022                         throwToSingleThreaded_(&capabilities[0], t, 
2023                                                NULL, rtsTrue, NULL);
2024                         
2025 #ifdef REG_R1
2026                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2027 #endif
2028                     }
2029                 }
2030             }
2031         }
2032     }
2033     
2034     // so this happens periodically:
2035     if (cap) scheduleCheckBlackHoles(cap);
2036     
2037     IF_DEBUG(scheduler, printAllThreads());
2038
2039     /*
2040      * We now have all the capabilities; if we're in an interrupting
2041      * state, then we should take the opportunity to delete all the
2042      * threads in the system.
2043      */
2044     if (sched_state >= SCHED_INTERRUPTING) {
2045         deleteAllThreads(&capabilities[0]);
2046         sched_state = SCHED_SHUTTING_DOWN;
2047     }
2048     
2049     heap_census = scheduleNeedHeapProfile(rtsTrue);
2050
2051     /* everybody back, start the GC.
2052      * Could do it in this thread, or signal a condition var
2053      * to do it in another thread.  Either way, we need to
2054      * broadcast on gc_pending_cond afterward.
2055      */
2056 #if defined(THREADED_RTS)
2057     debugTrace(DEBUG_sched, "doing GC");
2058 #endif
2059     GarbageCollect(force_major || heap_census);
2060     
2061     if (heap_census) {
2062         debugTrace(DEBUG_sched, "performing heap census");
2063         heapCensus();
2064         performHeapProfile = rtsFalse;
2065     }
2066
2067 #if defined(THREADED_RTS)
2068     // release our stash of capabilities.
2069     for (i = 0; i < n_capabilities; i++) {
2070         if (cap != &capabilities[i]) {
2071             task->cap = &capabilities[i];
2072             releaseCapability(&capabilities[i]);
2073         }
2074     }
2075     if (cap) {
2076         task->cap = cap;
2077     } else {
2078         task->cap = NULL;
2079     }
2080 #endif
2081
2082 #if defined(GRAN)
2083     /* add a ContinueThread event to continue execution of current thread */
2084     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2085               ContinueThread,
2086               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2087     IF_GRAN_DEBUG(bq, 
2088                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2089                   G_EVENTQ(0);
2090                   G_CURR_THREADQ(0));
2091 #endif /* GRAN */
2092
2093     return cap;
2094 }
2095
2096 /* ---------------------------------------------------------------------------
2097  * Singleton fork(). Do not copy any running threads.
2098  * ------------------------------------------------------------------------- */
2099
2100 pid_t
2101 forkProcess(HsStablePtr *entry
2102 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2103             STG_UNUSED
2104 #endif
2105            )
2106 {
2107 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2108     Task *task;
2109     pid_t pid;
2110     StgTSO* t,*next;
2111     Capability *cap;
2112     
2113 #if defined(THREADED_RTS)
2114     if (RtsFlags.ParFlags.nNodes > 1) {
2115         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2116         stg_exit(EXIT_FAILURE);
2117     }
2118 #endif
2119
2120     debugTrace(DEBUG_sched, "forking!");
2121     
2122     // ToDo: for SMP, we should probably acquire *all* the capabilities
2123     cap = rts_lock();
2124     
2125     pid = fork();
2126     
2127     if (pid) { // parent
2128         
2129         // just return the pid
2130         rts_unlock(cap);
2131         return pid;
2132         
2133     } else { // child
2134         
2135         // Now, all OS threads except the thread that forked are
2136         // stopped.  We need to stop all Haskell threads, including
2137         // those involved in foreign calls.  Also we need to delete
2138         // all Tasks, because they correspond to OS threads that are
2139         // now gone.
2140
2141         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2142             if (t->what_next == ThreadRelocated) {
2143                 next = t->link;
2144             } else {
2145                 next = t->global_link;
2146                 // don't allow threads to catch the ThreadKilled
2147                 // exception, but we do want to raiseAsync() because these
2148                 // threads may be evaluating thunks that we need later.
2149                 deleteThread_(cap,t);
2150             }
2151         }
2152         
2153         // Empty the run queue.  It seems tempting to let all the
2154         // killed threads stay on the run queue as zombies to be
2155         // cleaned up later, but some of them correspond to bound
2156         // threads for which the corresponding Task does not exist.
2157         cap->run_queue_hd = END_TSO_QUEUE;
2158         cap->run_queue_tl = END_TSO_QUEUE;
2159
2160         // Any suspended C-calling Tasks are no more, their OS threads
2161         // don't exist now:
2162         cap->suspended_ccalling_tasks = NULL;
2163
2164         // Empty the all_threads list.  Otherwise, the garbage
2165         // collector may attempt to resurrect some of these threads.
2166         all_threads = END_TSO_QUEUE;
2167
2168         // Wipe the task list, except the current Task.
2169         ACQUIRE_LOCK(&sched_mutex);
2170         for (task = all_tasks; task != NULL; task=task->all_link) {
2171             if (task != cap->running_task) {
2172                 discardTask(task);
2173             }
2174         }
2175         RELEASE_LOCK(&sched_mutex);
2176
2177 #if defined(THREADED_RTS)
2178         // Wipe our spare workers list, they no longer exist.  New
2179         // workers will be created if necessary.
2180         cap->spare_workers = NULL;
2181         cap->returning_tasks_hd = NULL;
2182         cap->returning_tasks_tl = NULL;
2183 #endif
2184
2185         // On Unix, all timers are reset in the child, so we need to start
2186         // the timer again.
2187         startTimer();
2188
2189         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2190         rts_checkSchedStatus("forkProcess",cap);
2191         
2192         rts_unlock(cap);
2193         hs_exit();                      // clean up and exit
2194         stg_exit(EXIT_SUCCESS);
2195     }
2196 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2197     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2198     return -1;
2199 #endif
2200 }
2201
2202 /* ---------------------------------------------------------------------------
2203  * Delete all the threads in the system
2204  * ------------------------------------------------------------------------- */
2205    
2206 static void
2207 deleteAllThreads ( Capability *cap )
2208 {
2209     // NOTE: only safe to call if we own all capabilities.
2210
2211     StgTSO* t, *next;
2212     debugTrace(DEBUG_sched,"deleting all threads");
2213     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214         if (t->what_next == ThreadRelocated) {
2215             next = t->link;
2216         } else {
2217             next = t->global_link;
2218             deleteThread(cap,t);
2219         }
2220     }      
2221
2222     // The run queue now contains a bunch of ThreadKilled threads.  We
2223     // must not throw these away: the main thread(s) will be in there
2224     // somewhere, and the main scheduler loop has to deal with it.
2225     // Also, the run queue is the only thing keeping these threads from
2226     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2227
2228 #if !defined(THREADED_RTS)
2229     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230     ASSERT(sleeping_queue == END_TSO_QUEUE);
2231 #endif
2232 }
2233
2234 /* -----------------------------------------------------------------------------
2235    Managing the suspended_ccalling_tasks list.
2236    Locks required: sched_mutex
2237    -------------------------------------------------------------------------- */
2238
2239 STATIC_INLINE void
2240 suspendTask (Capability *cap, Task *task)
2241 {
2242     ASSERT(task->next == NULL && task->prev == NULL);
2243     task->next = cap->suspended_ccalling_tasks;
2244     task->prev = NULL;
2245     if (cap->suspended_ccalling_tasks) {
2246         cap->suspended_ccalling_tasks->prev = task;
2247     }
2248     cap->suspended_ccalling_tasks = task;
2249 }
2250
2251 STATIC_INLINE void
2252 recoverSuspendedTask (Capability *cap, Task *task)
2253 {
2254     if (task->prev) {
2255         task->prev->next = task->next;
2256     } else {
2257         ASSERT(cap->suspended_ccalling_tasks == task);
2258         cap->suspended_ccalling_tasks = task->next;
2259     }
2260     if (task->next) {
2261         task->next->prev = task->prev;
2262     }
2263     task->next = task->prev = NULL;
2264 }
2265
2266 /* ---------------------------------------------------------------------------
2267  * Suspending & resuming Haskell threads.
2268  * 
2269  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270  * its capability before calling the C function.  This allows another
2271  * task to pick up the capability and carry on running Haskell
2272  * threads.  It also means that if the C call blocks, it won't lock
2273  * the whole system.
2274  *
2275  * The Haskell thread making the C call is put to sleep for the
2276  * duration of the call, on the susepended_ccalling_threads queue.  We
2277  * give out a token to the task, which it can use to resume the thread
2278  * on return from the C function.
2279  * ------------------------------------------------------------------------- */
2280    
2281 void *
2282 suspendThread (StgRegTable *reg)
2283 {
2284   Capability *cap;
2285   int saved_errno;
2286   StgTSO *tso;
2287   Task *task;
2288 #if mingw32_HOST_OS
2289   StgWord32 saved_winerror;
2290 #endif
2291
2292   saved_errno = errno;
2293 #if mingw32_HOST_OS
2294   saved_winerror = GetLastError();
2295 #endif
2296
2297   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2298    */
2299   cap = regTableToCapability(reg);
2300
2301   task = cap->running_task;
2302   tso = cap->r.rCurrentTSO;
2303
2304   debugTrace(DEBUG_sched, 
2305              "thread %lu did a safe foreign call", 
2306              (unsigned long)cap->r.rCurrentTSO->id);
2307
2308   // XXX this might not be necessary --SDM
2309   tso->what_next = ThreadRunGHC;
2310
2311   threadPaused(cap,tso);
2312
2313   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2314       tso->why_blocked = BlockedOnCCall;
2315       tso->flags |= TSO_BLOCKEX;
2316       tso->flags &= ~TSO_INTERRUPTIBLE;
2317   } else {
2318       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2319   }
2320
2321   // Hand back capability
2322   task->suspended_tso = tso;
2323
2324   ACQUIRE_LOCK(&cap->lock);
2325
2326   suspendTask(cap,task);
2327   cap->in_haskell = rtsFalse;
2328   releaseCapability_(cap);
2329   
2330   RELEASE_LOCK(&cap->lock);
2331
2332 #if defined(THREADED_RTS)
2333   /* Preparing to leave the RTS, so ensure there's a native thread/task
2334      waiting to take over.
2335   */
2336   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2337 #endif
2338
2339   errno = saved_errno;
2340 #if mingw32_HOST_OS
2341   SetLastError(saved_winerror);
2342 #endif
2343   return task;
2344 }
2345
2346 StgRegTable *
2347 resumeThread (void *task_)
2348 {
2349     StgTSO *tso;
2350     Capability *cap;
2351     Task *task = task_;
2352     int saved_errno;
2353 #if mingw32_HOST_OS
2354     StgWord32 saved_winerror;
2355 #endif
2356
2357     saved_errno = errno;
2358 #if mingw32_HOST_OS
2359     saved_winerror = GetLastError();
2360 #endif
2361
2362     cap = task->cap;
2363     // Wait for permission to re-enter the RTS with the result.
2364     waitForReturnCapability(&cap,task);
2365     // we might be on a different capability now... but if so, our
2366     // entry on the suspended_ccalling_tasks list will also have been
2367     // migrated.
2368
2369     // Remove the thread from the suspended list
2370     recoverSuspendedTask(cap,task);
2371
2372     tso = task->suspended_tso;
2373     task->suspended_tso = NULL;
2374     tso->link = END_TSO_QUEUE;
2375     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2376     
2377     if (tso->why_blocked == BlockedOnCCall) {
2378         awakenBlockedExceptionQueue(cap,tso);
2379         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2380     }
2381     
2382     /* Reset blocking status */
2383     tso->why_blocked  = NotBlocked;
2384     
2385     cap->r.rCurrentTSO = tso;
2386     cap->in_haskell = rtsTrue;
2387     errno = saved_errno;
2388 #if mingw32_HOST_OS
2389     SetLastError(saved_winerror);
2390 #endif
2391
2392     /* We might have GC'd, mark the TSO dirty again */
2393     dirtyTSO(tso);
2394
2395     IF_DEBUG(sanity, checkTSO(tso));
2396
2397     return &cap->r;
2398 }
2399
2400 /* ---------------------------------------------------------------------------
2401  * scheduleThread()
2402  *
2403  * scheduleThread puts a thread on the end  of the runnable queue.
2404  * This will usually be done immediately after a thread is created.
2405  * The caller of scheduleThread must create the thread using e.g.
2406  * createThread and push an appropriate closure
2407  * on this thread's stack before the scheduler is invoked.
2408  * ------------------------------------------------------------------------ */
2409
2410 void
2411 scheduleThread(Capability *cap, StgTSO *tso)
2412 {
2413     // The thread goes at the *end* of the run-queue, to avoid possible
2414     // starvation of any threads already on the queue.
2415     appendToRunQueue(cap,tso);
2416 }
2417
2418 void
2419 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2420 {
2421 #if defined(THREADED_RTS)
2422     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2423                               // move this thread from now on.
2424     cpu %= RtsFlags.ParFlags.nNodes;
2425     if (cpu == cap->no) {
2426         appendToRunQueue(cap,tso);
2427     } else {
2428         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2429     }
2430 #else
2431     appendToRunQueue(cap,tso);
2432 #endif
2433 }
2434
2435 Capability *
2436 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2437 {
2438     Task *task;
2439
2440     // We already created/initialised the Task
2441     task = cap->running_task;
2442
2443     // This TSO is now a bound thread; make the Task and TSO
2444     // point to each other.
2445     tso->bound = task;
2446     tso->cap = cap;
2447
2448     task->tso = tso;
2449     task->ret = ret;
2450     task->stat = NoStatus;
2451
2452     appendToRunQueue(cap,tso);
2453
2454     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2455
2456 #if defined(GRAN)
2457     /* GranSim specific init */
2458     CurrentTSO = m->tso;                // the TSO to run
2459     procStatus[MainProc] = Busy;        // status of main PE
2460     CurrentProc = MainProc;             // PE to run it on
2461 #endif
2462
2463     cap = schedule(cap,task);
2464
2465     ASSERT(task->stat != NoStatus);
2466     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2467
2468     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2469     return cap;
2470 }
2471
2472 /* ----------------------------------------------------------------------------
2473  * Starting Tasks
2474  * ------------------------------------------------------------------------- */
2475
2476 #if defined(THREADED_RTS)
2477 void
2478 workerStart(Task *task)
2479 {
2480     Capability *cap;
2481
2482     // See startWorkerTask().
2483     ACQUIRE_LOCK(&task->lock);
2484     cap = task->cap;
2485     RELEASE_LOCK(&task->lock);
2486
2487     // set the thread-local pointer to the Task:
2488     taskEnter(task);
2489
2490     // schedule() runs without a lock.
2491     cap = schedule(cap,task);
2492
2493     // On exit from schedule(), we have a Capability.
2494     releaseCapability(cap);
2495     workerTaskStop(task);
2496 }
2497 #endif
2498
2499 /* ---------------------------------------------------------------------------
2500  * initScheduler()
2501  *
2502  * Initialise the scheduler.  This resets all the queues - if the
2503  * queues contained any threads, they'll be garbage collected at the
2504  * next pass.
2505  *
2506  * ------------------------------------------------------------------------ */
2507
2508 void 
2509 initScheduler(void)
2510 {
2511 #if defined(GRAN)
2512   nat i;
2513   for (i=0; i<=MAX_PROC; i++) {
2514     run_queue_hds[i]      = END_TSO_QUEUE;
2515     run_queue_tls[i]      = END_TSO_QUEUE;
2516     blocked_queue_hds[i]  = END_TSO_QUEUE;
2517     blocked_queue_tls[i]  = END_TSO_QUEUE;
2518     ccalling_threadss[i]  = END_TSO_QUEUE;
2519     blackhole_queue[i]    = END_TSO_QUEUE;
2520     sleeping_queue        = END_TSO_QUEUE;
2521   }
2522 #elif !defined(THREADED_RTS)
2523   blocked_queue_hd  = END_TSO_QUEUE;
2524   blocked_queue_tl  = END_TSO_QUEUE;
2525   sleeping_queue    = END_TSO_QUEUE;
2526 #endif
2527
2528   blackhole_queue   = END_TSO_QUEUE;
2529   all_threads       = END_TSO_QUEUE;
2530
2531   context_switch = 0;
2532   sched_state    = SCHED_RUNNING;
2533
2534 #if defined(THREADED_RTS)
2535   /* Initialise the mutex and condition variables used by
2536    * the scheduler. */
2537   initMutex(&sched_mutex);
2538 #endif
2539   
2540   ACQUIRE_LOCK(&sched_mutex);
2541
2542   /* A capability holds the state a native thread needs in
2543    * order to execute STG code. At least one capability is
2544    * floating around (only THREADED_RTS builds have more than one).
2545    */
2546   initCapabilities();
2547
2548   initTaskManager();
2549
2550 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2551   initSparkPools();
2552 #endif
2553
2554 #if defined(THREADED_RTS)
2555   /*
2556    * Eagerly start one worker to run each Capability, except for
2557    * Capability 0.  The idea is that we're probably going to start a
2558    * bound thread on Capability 0 pretty soon, so we don't want a
2559    * worker task hogging it.
2560    */
2561   { 
2562       nat i;
2563       Capability *cap;
2564       for (i = 1; i < n_capabilities; i++) {
2565           cap = &capabilities[i];
2566           ACQUIRE_LOCK(&cap->lock);
2567           startWorkerTask(cap, workerStart);
2568           RELEASE_LOCK(&cap->lock);
2569       }
2570   }
2571 #endif
2572
2573   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2574
2575   RELEASE_LOCK(&sched_mutex);
2576 }
2577
2578 void
2579 exitScheduler( void )
2580 {
2581     Task *task = NULL;
2582
2583 #if defined(THREADED_RTS)
2584     ACQUIRE_LOCK(&sched_mutex);
2585     task = newBoundTask();
2586     RELEASE_LOCK(&sched_mutex);
2587 #endif
2588
2589     // If we haven't killed all the threads yet, do it now.
2590     if (sched_state < SCHED_SHUTTING_DOWN) {
2591         sched_state = SCHED_INTERRUPTING;
2592         scheduleDoGC(NULL,task,rtsFalse);    
2593     }
2594     sched_state = SCHED_SHUTTING_DOWN;
2595
2596 #if defined(THREADED_RTS)
2597     { 
2598         nat i;
2599         
2600         for (i = 0; i < n_capabilities; i++) {
2601             shutdownCapability(&capabilities[i], task);
2602         }
2603         boundTaskExiting(task);
2604         stopTaskManager();
2605     }
2606 #else
2607     freeCapability(&MainCapability);
2608 #endif
2609 }
2610
2611 void
2612 freeScheduler( void )
2613 {
2614     freeTaskManager();
2615     if (n_capabilities != 1) {
2616         stgFree(capabilities);
2617     }
2618 #if defined(THREADED_RTS)
2619     closeMutex(&sched_mutex);
2620 #endif
2621 }
2622
2623 /* ---------------------------------------------------------------------------
2624    Where are the roots that we know about?
2625
2626         - all the threads on the runnable queue
2627         - all the threads on the blocked queue
2628         - all the threads on the sleeping queue
2629         - all the thread currently executing a _ccall_GC
2630         - all the "main threads"
2631      
2632    ------------------------------------------------------------------------ */
2633
2634 /* This has to be protected either by the scheduler monitor, or by the
2635         garbage collection monitor (probably the latter).
2636         KH @ 25/10/99
2637 */
2638
2639 void
2640 GetRoots( evac_fn evac )
2641 {
2642     nat i;
2643     Capability *cap;
2644     Task *task;
2645
2646 #if defined(GRAN)
2647     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2648         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2649             evac((StgClosure **)&run_queue_hds[i]);
2650         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2651             evac((StgClosure **)&run_queue_tls[i]);
2652         
2653         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2654             evac((StgClosure **)&blocked_queue_hds[i]);
2655         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2656             evac((StgClosure **)&blocked_queue_tls[i]);
2657         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2658             evac((StgClosure **)&ccalling_threads[i]);
2659     }
2660
2661     markEventQueue();
2662
2663 #else /* !GRAN */
2664
2665     for (i = 0; i < n_capabilities; i++) {
2666         cap = &capabilities[i];
2667         evac((StgClosure **)(void *)&cap->run_queue_hd);
2668         evac((StgClosure **)(void *)&cap->run_queue_tl);
2669 #if defined(THREADED_RTS)
2670         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2671         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2672 #endif
2673         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2674              task=task->next) {
2675             debugTrace(DEBUG_sched,
2676                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2677             evac((StgClosure **)(void *)&task->suspended_tso);
2678         }
2679
2680     }
2681     
2682
2683 #if !defined(THREADED_RTS)
2684     evac((StgClosure **)(void *)&blocked_queue_hd);
2685     evac((StgClosure **)(void *)&blocked_queue_tl);
2686     evac((StgClosure **)(void *)&sleeping_queue);
2687 #endif 
2688 #endif
2689
2690     // evac((StgClosure **)&blackhole_queue);
2691
2692 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2693     markSparkQueue(evac);
2694 #endif
2695     
2696 #if defined(RTS_USER_SIGNALS)
2697     // mark the signal handlers (signals should be already blocked)
2698     if (RtsFlags.MiscFlags.install_signal_handlers) {
2699         markSignalHandlers(evac);
2700     }
2701 #endif
2702 }
2703
2704 /* -----------------------------------------------------------------------------
2705    performGC
2706
2707    This is the interface to the garbage collector from Haskell land.
2708    We provide this so that external C code can allocate and garbage
2709    collect when called from Haskell via _ccall_GC.
2710    -------------------------------------------------------------------------- */
2711
2712 static void
2713 performGC_(rtsBool force_major)
2714 {
2715     Task *task;
2716     // We must grab a new Task here, because the existing Task may be
2717     // associated with a particular Capability, and chained onto the 
2718     // suspended_ccalling_tasks queue.
2719     ACQUIRE_LOCK(&sched_mutex);
2720     task = newBoundTask();
2721     RELEASE_LOCK(&sched_mutex);
2722     scheduleDoGC(NULL,task,force_major);
2723     boundTaskExiting(task);
2724 }
2725
2726 void
2727 performGC(void)
2728 {
2729     performGC_(rtsFalse);
2730 }
2731
2732 void
2733 performMajorGC(void)
2734 {
2735     performGC_(rtsTrue);
2736 }
2737
2738 /* -----------------------------------------------------------------------------
2739    Stack overflow
2740
2741    If the thread has reached its maximum stack size, then raise the
2742    StackOverflow exception in the offending thread.  Otherwise
2743    relocate the TSO into a larger chunk of memory and adjust its stack
2744    size appropriately.
2745    -------------------------------------------------------------------------- */
2746
2747 static StgTSO *
2748 threadStackOverflow(Capability *cap, StgTSO *tso)
2749 {
2750   nat new_stack_size, stack_words;
2751   lnat new_tso_size;
2752   StgPtr new_sp;
2753   StgTSO *dest;
2754
2755   IF_DEBUG(sanity,checkTSO(tso));
2756
2757   // don't allow throwTo() to modify the blocked_exceptions queue
2758   // while we are moving the TSO:
2759   lockClosure((StgClosure *)tso);
2760
2761   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2762       // NB. never raise a StackOverflow exception if the thread is
2763       // inside Control.Exceptino.block.  It is impractical to protect
2764       // against stack overflow exceptions, since virtually anything
2765       // can raise one (even 'catch'), so this is the only sensible
2766       // thing to do here.  See bug #767.
2767
2768       debugTrace(DEBUG_gc,
2769                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2770                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2771       IF_DEBUG(gc,
2772                /* If we're debugging, just print out the top of the stack */
2773                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2774                                                 tso->sp+64)));
2775
2776       // Send this thread the StackOverflow exception
2777       unlockTSO(tso);
2778       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2779       return tso;
2780   }
2781
2782   /* Try to double the current stack size.  If that takes us over the
2783    * maximum stack size for this thread, then use the maximum instead.
2784    * Finally round up so the TSO ends up as a whole number of blocks.
2785    */
2786   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2787   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2788                                        TSO_STRUCT_SIZE)/sizeof(W_);
2789   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2790   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2791
2792   debugTrace(DEBUG_sched, 
2793              "increasing stack size from %ld words to %d.",
2794              (long)tso->stack_size, new_stack_size);
2795
2796   dest = (StgTSO *)allocate(new_tso_size);
2797   TICK_ALLOC_TSO(new_stack_size,0);
2798
2799   /* copy the TSO block and the old stack into the new area */
2800   memcpy(dest,tso,TSO_STRUCT_SIZE);
2801   stack_words = tso->stack + tso->stack_size - tso->sp;
2802   new_sp = (P_)dest + new_tso_size - stack_words;
2803   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2804
2805   /* relocate the stack pointers... */
2806   dest->sp         = new_sp;
2807   dest->stack_size = new_stack_size;
2808         
2809   /* Mark the old TSO as relocated.  We have to check for relocated
2810    * TSOs in the garbage collector and any primops that deal with TSOs.
2811    *
2812    * It's important to set the sp value to just beyond the end
2813    * of the stack, so we don't attempt to scavenge any part of the
2814    * dead TSO's stack.
2815    */
2816   tso->what_next = ThreadRelocated;
2817   tso->link = dest;
2818   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2819   tso->why_blocked = NotBlocked;
2820
2821   IF_PAR_DEBUG(verbose,
2822                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2823                      tso->id, tso, tso->stack_size);
2824                /* If we're debugging, just print out the top of the stack */
2825                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2826                                                 tso->sp+64)));
2827   
2828   unlockTSO(dest);
2829   unlockTSO(tso);
2830
2831   IF_DEBUG(sanity,checkTSO(dest));
2832 #if 0
2833   IF_DEBUG(scheduler,printTSO(dest));
2834 #endif
2835
2836   return dest;
2837 }
2838
2839 /* ---------------------------------------------------------------------------
2840    Interrupt execution
2841    - usually called inside a signal handler so it mustn't do anything fancy.   
2842    ------------------------------------------------------------------------ */
2843
2844 void
2845 interruptStgRts(void)
2846 {
2847     sched_state = SCHED_INTERRUPTING;
2848     context_switch = 1;
2849     wakeUpRts();
2850 }
2851
2852 /* -----------------------------------------------------------------------------
2853    Wake up the RTS
2854    
2855    This function causes at least one OS thread to wake up and run the
2856    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2857    an external event has arrived that may need servicing (eg. a
2858    keyboard interrupt).
2859
2860    In the single-threaded RTS we don't do anything here; we only have
2861    one thread anyway, and the event that caused us to want to wake up
2862    will have interrupted any blocking system call in progress anyway.
2863    -------------------------------------------------------------------------- */
2864
2865 void
2866 wakeUpRts(void)
2867 {
2868 #if defined(THREADED_RTS)
2869     // This forces the IO Manager thread to wakeup, which will
2870     // in turn ensure that some OS thread wakes up and runs the
2871     // scheduler loop, which will cause a GC and deadlock check.
2872     ioManagerWakeup();
2873 #endif
2874 }
2875
2876 /* -----------------------------------------------------------------------------
2877  * checkBlackHoles()
2878  *
2879  * Check the blackhole_queue for threads that can be woken up.  We do
2880  * this periodically: before every GC, and whenever the run queue is
2881  * empty.
2882  *
2883  * An elegant solution might be to just wake up all the blocked
2884  * threads with awakenBlockedQueue occasionally: they'll go back to
2885  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2886  * doesn't give us a way to tell whether we've actually managed to
2887  * wake up any threads, so we would be busy-waiting.
2888  *
2889  * -------------------------------------------------------------------------- */
2890
2891 static rtsBool
2892 checkBlackHoles (Capability *cap)
2893 {
2894     StgTSO **prev, *t;
2895     rtsBool any_woke_up = rtsFalse;
2896     StgHalfWord type;
2897
2898     // blackhole_queue is global:
2899     ASSERT_LOCK_HELD(&sched_mutex);
2900
2901     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2902
2903     // ASSUMES: sched_mutex
2904     prev = &blackhole_queue;
2905     t = blackhole_queue;
2906     while (t != END_TSO_QUEUE) {
2907         ASSERT(t->why_blocked == BlockedOnBlackHole);
2908         type = get_itbl(t->block_info.closure)->type;
2909         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2910             IF_DEBUG(sanity,checkTSO(t));
2911             t = unblockOne(cap, t);
2912             // urk, the threads migrate to the current capability
2913             // here, but we'd like to keep them on the original one.
2914             *prev = t;
2915             any_woke_up = rtsTrue;
2916         } else {
2917             prev = &t->link;
2918             t = t->link;
2919         }
2920     }
2921
2922     return any_woke_up;
2923 }
2924
2925 /* -----------------------------------------------------------------------------
2926    Deleting threads
2927
2928    This is used for interruption (^C) and forking, and corresponds to
2929    raising an exception but without letting the thread catch the
2930    exception.
2931    -------------------------------------------------------------------------- */
2932
2933 static void 
2934 deleteThread (Capability *cap, StgTSO *tso)
2935 {
2936     // NOTE: must only be called on a TSO that we have exclusive
2937     // access to, because we will call throwToSingleThreaded() below.
2938     // The TSO must be on the run queue of the Capability we own, or 
2939     // we must own all Capabilities.
2940
2941     if (tso->why_blocked != BlockedOnCCall &&
2942         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2943         throwToSingleThreaded(cap,tso,NULL);
2944     }
2945 }
2946
2947 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2948 static void 
2949 deleteThread_(Capability *cap, StgTSO *tso)
2950 { // for forkProcess only:
2951   // like deleteThread(), but we delete threads in foreign calls, too.
2952
2953     if (tso->why_blocked == BlockedOnCCall ||
2954         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2955         unblockOne(cap,tso);
2956         tso->what_next = ThreadKilled;
2957     } else {
2958         deleteThread(cap,tso);
2959     }
2960 }
2961 #endif
2962
2963 /* -----------------------------------------------------------------------------
2964    raiseExceptionHelper
2965    
2966    This function is called by the raise# primitve, just so that we can
2967    move some of the tricky bits of raising an exception from C-- into
2968    C.  Who knows, it might be a useful re-useable thing here too.
2969    -------------------------------------------------------------------------- */
2970
2971 StgWord
2972 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2973 {
2974     Capability *cap = regTableToCapability(reg);
2975     StgThunk *raise_closure = NULL;
2976     StgPtr p, next;
2977     StgRetInfoTable *info;
2978     //
2979     // This closure represents the expression 'raise# E' where E
2980     // is the exception raise.  It is used to overwrite all the
2981     // thunks which are currently under evaluataion.
2982     //
2983
2984     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2985     // LDV profiling: stg_raise_info has THUNK as its closure
2986     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2987     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2988     // 1 does not cause any problem unless profiling is performed.
2989     // However, when LDV profiling goes on, we need to linearly scan
2990     // small object pool, where raise_closure is stored, so we should
2991     // use MIN_UPD_SIZE.
2992     //
2993     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2994     //                                 sizeofW(StgClosure)+1);
2995     //
2996
2997     //
2998     // Walk up the stack, looking for the catch frame.  On the way,
2999     // we update any closures pointed to from update frames with the
3000     // raise closure that we just built.
3001     //
3002     p = tso->sp;
3003     while(1) {
3004         info = get_ret_itbl((StgClosure *)p);
3005         next = p + stack_frame_sizeW((StgClosure *)p);
3006         switch (info->i.type) {
3007             
3008         case UPDATE_FRAME:
3009             // Only create raise_closure if we need to.
3010             if (raise_closure == NULL) {
3011                 raise_closure = 
3012                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3013                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3014                 raise_closure->payload[0] = exception;
3015             }
3016             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3017             p = next;
3018             continue;
3019
3020         case ATOMICALLY_FRAME:
3021             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3022             tso->sp = p;
3023             return ATOMICALLY_FRAME;
3024             
3025         case CATCH_FRAME:
3026             tso->sp = p;
3027             return CATCH_FRAME;
3028
3029         case CATCH_STM_FRAME:
3030             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3031             tso->sp = p;
3032             return CATCH_STM_FRAME;
3033             
3034         case STOP_FRAME:
3035             tso->sp = p;
3036             return STOP_FRAME;
3037
3038         case CATCH_RETRY_FRAME:
3039         default:
3040             p = next; 
3041             continue;
3042         }
3043     }
3044 }
3045
3046
3047 /* -----------------------------------------------------------------------------
3048    findRetryFrameHelper
3049
3050    This function is called by the retry# primitive.  It traverses the stack
3051    leaving tso->sp referring to the frame which should handle the retry.  
3052
3053    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3054    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3055
3056    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3057    create) because retries are not considered to be exceptions, despite the
3058    similar implementation.
3059
3060    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3061    not be created within memory transactions.
3062    -------------------------------------------------------------------------- */
3063
3064 StgWord
3065 findRetryFrameHelper (StgTSO *tso)
3066 {
3067   StgPtr           p, next;
3068   StgRetInfoTable *info;
3069
3070   p = tso -> sp;
3071   while (1) {
3072     info = get_ret_itbl((StgClosure *)p);
3073     next = p + stack_frame_sizeW((StgClosure *)p);
3074     switch (info->i.type) {
3075       
3076     case ATOMICALLY_FRAME:
3077         debugTrace(DEBUG_stm,
3078                    "found ATOMICALLY_FRAME at %p during retry", p);
3079         tso->sp = p;
3080         return ATOMICALLY_FRAME;
3081       
3082     case CATCH_RETRY_FRAME:
3083         debugTrace(DEBUG_stm,
3084                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3085         tso->sp = p;
3086         return CATCH_RETRY_FRAME;
3087       
3088     case CATCH_STM_FRAME: {
3089         debugTrace(DEBUG_stm,
3090                    "found CATCH_STM_FRAME at %p during retry", p);
3091         StgTRecHeader *trec = tso -> trec;
3092         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3093         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3094         stmAbortTransaction(tso -> cap, trec);
3095         stmFreeAbortedTRec(tso -> cap, trec);
3096         tso -> trec = outer;
3097         p = next; 
3098         continue;
3099     }
3100       
3101
3102     default:
3103       ASSERT(info->i.type != CATCH_FRAME);
3104       ASSERT(info->i.type != STOP_FRAME);
3105       p = next; 
3106       continue;
3107     }
3108   }
3109 }
3110
3111 /* -----------------------------------------------------------------------------
3112    resurrectThreads is called after garbage collection on the list of
3113    threads found to be garbage.  Each of these threads will be woken
3114    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3115    on an MVar, or NonTermination if the thread was blocked on a Black
3116    Hole.
3117
3118    Locks: assumes we hold *all* the capabilities.
3119    -------------------------------------------------------------------------- */
3120
3121 void
3122 resurrectThreads (StgTSO *threads)
3123 {
3124     StgTSO *tso, *next;
3125     Capability *cap;
3126
3127     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3128         next = tso->global_link;
3129         tso->global_link = all_threads;
3130         all_threads = tso;
3131         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3132         
3133         // Wake up the thread on the Capability it was last on
3134         cap = tso->cap;
3135         
3136         switch (tso->why_blocked) {
3137         case BlockedOnMVar:
3138         case BlockedOnException:
3139             /* Called by GC - sched_mutex lock is currently held. */
3140             throwToSingleThreaded(cap, tso,
3141                                   (StgClosure *)BlockedOnDeadMVar_closure);
3142             break;
3143         case BlockedOnBlackHole:
3144             throwToSingleThreaded(cap, tso,
3145                                   (StgClosure *)NonTermination_closure);
3146             break;
3147         case BlockedOnSTM:
3148             throwToSingleThreaded(cap, tso,
3149                                   (StgClosure *)BlockedIndefinitely_closure);
3150             break;
3151         case NotBlocked:
3152             /* This might happen if the thread was blocked on a black hole
3153              * belonging to a thread that we've just woken up (raiseAsync
3154              * can wake up threads, remember...).
3155              */
3156             continue;
3157         default:
3158             barf("resurrectThreads: thread blocked in a strange way");
3159         }
3160     }
3161 }