hs_exit()/shutdownHaskell(): wait for outstanding foreign calls to complete before...
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
148  *  exists - earlier gccs apparently didn't.
149  *  -= chak
150  */
151 StgTSO dummy_tso;
152
153 /*
154  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155  * in an MT setting, needed to signal that a worker thread shouldn't hang around
156  * in the scheduler when it is out of work.
157  */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161  * This mutex protects most of the global scheduler data in
162  * the THREADED_RTS runtime.
163  */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179  * static function prototypes
180  * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
212                                          StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
214                                     nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217                                              StgTSO *t );
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220                                 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);  
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250  * Putting a thread on the run queue: different scheduling policies
251  * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257     if (RtsFlags.ParFlags.doFairScheduling) { 
258         // this does round-robin scheduling; good for concurrency
259         appendToRunQueue(cap,t);
260     } else {
261         // this does unfair scheduling; good for parallelism
262         pushOnRunQueue(cap,t);
263     }
264 #else
265     // this does round-robin scheduling; good for concurrency
266     appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271    Main scheduling loop.
272
273    We use round-robin scheduling, each thread returning to the
274    scheduler loop when one of these conditions is detected:
275
276       * out of heap space
277       * timer expires (thread yields)
278       * thread blocks
279       * thread ends
280       * stack overflow
281
282    GRAN version:
283      In a GranSim setup this loop iterates over the global event queue.
284      This revolves around the global event queue, which determines what 
285      to do next. Therefore, it's more complicated than either the 
286      concurrent or the parallel (GUM) setup.
287
288    GUM version:
289      GUM iterates over incoming messages.
290      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291      and sends out a fish whenever it has nothing to do; in-between
292      doing the actual reductions (shared code below) it processes the
293      incoming messages and deals with delayed operations 
294      (see PendingFetches).
295      This is not the ugliest code you could imagine, but it's bloody close.
296
297    ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302   StgTSO *t;
303   Capability *cap;
304   StgThreadReturnCode ret;
305 #if defined(GRAN)
306   rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308   StgTSO *tso;
309   GlobalTaskId pe;
310   rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312   nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315   nat prev_what_next;
316   rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318   rtsBool first = rtsTrue;
319 #endif
320   
321   cap = initialCapability;
322
323   // Pre-condition: this task owns initialCapability.
324   // The sched_mutex is *NOT* held
325   // NB. on return, we still hold a capability.
326
327   debugTrace (DEBUG_sched, 
328               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329               task, initialCapability);
330
331   schedulePreLoop();
332
333   // -----------------------------------------------------------
334   // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION        (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
340 #else
341 #define TERMINATION_CONDITION        rtsTrue
342 #endif
343
344   while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347       /* Choose the processor with the next event */
348       CurrentProc = event->proc;
349       CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353       if (first) {
354           // don't yield the first time, we want a chance to run this
355           // thread for a bit, even if there are others banging at the
356           // door.
357           first = rtsFalse;
358           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359       } else {
360           // Yield the capability to higher-priority tasks if necessary.
361           yieldCapability(&cap, task);
362       }
363 #endif
364       
365 #if defined(THREADED_RTS)
366       schedulePushWork(cap,task);
367 #endif
368
369     // Check whether we have re-entered the RTS from Haskell without
370     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371     // call).
372     if (cap->in_haskell) {
373           errorBelch("schedule: re-entered unsafely.\n"
374                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
375           stg_exit(EXIT_FAILURE);
376     }
377
378     // The interruption / shutdown sequence.
379     // 
380     // In order to cleanly shut down the runtime, we want to:
381     //   * make sure that all main threads return to their callers
382     //     with the state 'Interrupted'.
383     //   * clean up all OS threads assocated with the runtime
384     //   * free all memory etc.
385     //
386     // So the sequence for ^C goes like this:
387     //
388     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
389     //     arranges for some Capability to wake up
390     //
391     //   * all threads in the system are halted, and the zombies are
392     //     placed on the run queue for cleaning up.  We acquire all
393     //     the capabilities in order to delete the threads, this is
394     //     done by scheduleDoGC() for convenience (because GC already
395     //     needs to acquire all the capabilities).  We can't kill
396     //     threads involved in foreign calls.
397     // 
398     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
399     //
400     //   * sched_state := SCHED_SHUTTING_DOWN
401     //
402     //   * all workers exit when the run queue on their capability
403     //     drains.  All main threads will also exit when their TSO
404     //     reaches the head of the run queue and they can return.
405     //
406     //   * eventually all Capabilities will shut down, and the RTS can
407     //     exit.
408     //
409     //   * We might be left with threads blocked in foreign calls, 
410     //     we should really attempt to kill these somehow (TODO);
411     
412     switch (sched_state) {
413     case SCHED_RUNNING:
414         break;
415     case SCHED_INTERRUPTING:
416         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418         discardSparksCap(cap);
419 #endif
420         /* scheduleDoGC() deletes all the threads */
421         cap = scheduleDoGC(cap,task,rtsFalse);
422         break;
423     case SCHED_SHUTTING_DOWN:
424         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425         // If we are a worker, just exit.  If we're a bound thread
426         // then we will exit below when we've removed our TSO from
427         // the run queue.
428         if (task->tso == NULL && emptyRunQueue(cap)) {
429             return cap;
430         }
431         break;
432     default:
433         barf("sched_state: %d", sched_state);
434     }
435
436 #if defined(THREADED_RTS)
437     // If the run queue is empty, take a spark and turn it into a thread.
438     {
439         if (emptyRunQueue(cap)) {
440             StgClosure *spark;
441             spark = findSpark(cap);
442             if (spark != NULL) {
443                 debugTrace(DEBUG_sched,
444                            "turning spark of closure %p into a thread",
445                            (StgClosure *)spark);
446                 createSparkThread(cap,spark);     
447             }
448         }
449     }
450 #endif // THREADED_RTS
451
452     scheduleStartSignalHandlers(cap);
453
454     // Only check the black holes here if we've nothing else to do.
455     // During normal execution, the black hole list only gets checked
456     // at GC time, to avoid repeatedly traversing this possibly long
457     // list each time around the scheduler.
458     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460     scheduleCheckWakeupThreads(cap);
461
462     scheduleCheckBlockedThreads(cap);
463
464     scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466     cap = task->cap;    // reload cap, it might have changed
467 #endif
468
469     // Normally, the only way we can get here with no threads to
470     // run is if a keyboard interrupt received during 
471     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472     // Additionally, it is not fatal for the
473     // threaded RTS to reach here with no threads to run.
474     //
475     // win32: might be here due to awaitEvent() being abandoned
476     // as a result of a console event having been delivered.
477     if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479         ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481         continue; // nothing to do
482     }
483
484 #if defined(PARALLEL_HASKELL)
485     scheduleSendPendingMessages();
486     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
487         continue;
488
489 #if defined(SPARKS)
490     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
491 #endif
492
493     /* If we still have no work we need to send a FISH to get a spark
494        from another PE */
495     if (emptyRunQueue(cap)) {
496         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497         ASSERT(rtsFalse); // should not happen at the moment
498     }
499     // from here: non-empty run queue.
500     //  TODO: merge above case with this, only one call processMessages() !
501     if (PacketsWaiting()) {  /* process incoming messages, if
502                                 any pending...  only in else
503                                 because getRemoteWork waits for
504                                 messages as well */
505         receivedFinish = processMessages();
506     }
507 #endif
508
509 #if defined(GRAN)
510     scheduleProcessEvent(event);
511 #endif
512
513     // 
514     // Get a thread to run
515     //
516     t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519     scheduleGranParReport(); // some kind of debuging output
520 #else
521     // Sanity check the thread we're about to run.  This can be
522     // expensive if there is lots of thread switching going on...
523     IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527     // Check whether we can run this thread in the current task.
528     // If not, we have to pass our capability to the right task.
529     {
530         Task *bound = t->bound;
531       
532         if (bound) {
533             if (bound == task) {
534                 debugTrace(DEBUG_sched,
535                            "### Running thread %lu in bound thread", (unsigned long)t->id);
536                 // yes, the Haskell thread is bound to the current native thread
537             } else {
538                 debugTrace(DEBUG_sched,
539                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
540                 // no, bound to a different Haskell thread: pass to that thread
541                 pushOnRunQueue(cap,t);
542                 continue;
543             }
544         } else {
545             // The thread we want to run is unbound.
546             if (task->tso) { 
547                 debugTrace(DEBUG_sched,
548                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549                 // no, the current native thread is bound to a different
550                 // Haskell thread, so pass it to any worker thread
551                 pushOnRunQueue(cap,t);
552                 continue; 
553             }
554         }
555     }
556 #endif
557
558     cap->r.rCurrentTSO = t;
559     
560     /* context switches are initiated by the timer signal, unless
561      * the user specified "context switch as often as possible", with
562      * +RTS -C0
563      */
564     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565         && !emptyThreadQueues(cap)) {
566         context_switch = 1;
567     }
568          
569 run_thread:
570
571     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
572                               (long)t->id, whatNext_strs[t->what_next]);
573
574     startHeapProfTimer();
575
576     // Check for exceptions blocked on this thread
577     maybePerformBlockedException (cap, t);
578
579     // ----------------------------------------------------------------------
580     // Run the current thread 
581
582     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583     ASSERT(t->cap == cap);
584
585     prev_what_next = t->what_next;
586
587     errno = t->saved_errno;
588 #if mingw32_HOST_OS
589     SetLastError(t->saved_winerror);
590 #endif
591
592     cap->in_haskell = rtsTrue;
593
594     dirtyTSO(t);
595
596     recent_activity = ACTIVITY_YES;
597
598     switch (prev_what_next) {
599         
600     case ThreadKilled:
601     case ThreadComplete:
602         /* Thread already finished, return to scheduler. */
603         ret = ThreadFinished;
604         break;
605         
606     case ThreadRunGHC:
607     {
608         StgRegTable *r;
609         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
610         cap = regTableToCapability(r);
611         ret = r->rRet;
612         break;
613     }
614     
615     case ThreadInterpret:
616         cap = interpretBCO(cap);
617         ret = cap->r.rRet;
618         break;
619         
620     default:
621         barf("schedule: invalid what_next field");
622     }
623
624     cap->in_haskell = rtsFalse;
625
626     // The TSO might have moved, eg. if it re-entered the RTS and a GC
627     // happened.  So find the new location:
628     t = cap->r.rCurrentTSO;
629
630     // We have run some Haskell code: there might be blackhole-blocked
631     // threads to wake up now.
632     // Lock-free test here should be ok, we're just setting a flag.
633     if ( blackhole_queue != END_TSO_QUEUE ) {
634         blackholes_need_checking = rtsTrue;
635     }
636     
637     // And save the current errno in this thread.
638     // XXX: possibly bogus for SMP because this thread might already
639     // be running again, see code below.
640     t->saved_errno = errno;
641 #if mingw32_HOST_OS
642     // Similarly for Windows error code
643     t->saved_winerror = GetLastError();
644 #endif
645
646 #if defined(THREADED_RTS)
647     // If ret is ThreadBlocked, and this Task is bound to the TSO that
648     // blocked, we are in limbo - the TSO is now owned by whatever it
649     // is blocked on, and may in fact already have been woken up,
650     // perhaps even on a different Capability.  It may be the case
651     // that task->cap != cap.  We better yield this Capability
652     // immediately and return to normaility.
653     if (ret == ThreadBlocked) {
654         debugTrace(DEBUG_sched,
655                    "--<< thread %lu (%s) stopped: blocked",
656                    (unsigned long)t->id, whatNext_strs[t->what_next]);
657         continue;
658     }
659 #endif
660
661     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
662     ASSERT(t->cap == cap);
663
664     // ----------------------------------------------------------------------
665     
666     // Costs for the scheduler are assigned to CCS_SYSTEM
667     stopHeapProfTimer();
668 #if defined(PROFILING)
669     CCCS = CCS_SYSTEM;
670 #endif
671     
672     schedulePostRunThread();
673
674     ready_to_gc = rtsFalse;
675
676     switch (ret) {
677     case HeapOverflow:
678         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
679         break;
680
681     case StackOverflow:
682         scheduleHandleStackOverflow(cap,task,t);
683         break;
684
685     case ThreadYielding:
686         if (scheduleHandleYield(cap, t, prev_what_next)) {
687             // shortcut for switching between compiler/interpreter:
688             goto run_thread; 
689         }
690         break;
691
692     case ThreadBlocked:
693         scheduleHandleThreadBlocked(t);
694         break;
695
696     case ThreadFinished:
697         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
698         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
699         break;
700
701     default:
702       barf("schedule: invalid thread return code %d", (int)ret);
703     }
704
705     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
706       cap = scheduleDoGC(cap,task,rtsFalse);
707     }
708   } /* end of while() */
709
710   debugTrace(PAR_DEBUG_verbose,
711              "== Leaving schedule() after having received Finish");
712 }
713
714 /* ----------------------------------------------------------------------------
715  * Setting up the scheduler loop
716  * ------------------------------------------------------------------------- */
717
718 static void
719 schedulePreLoop(void)
720 {
721 #if defined(GRAN) 
722     /* set up first event to get things going */
723     /* ToDo: assign costs for system setup and init MainTSO ! */
724     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
725               ContinueThread, 
726               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
727     
728     debugTrace (DEBUG_gran,
729                 "GRAN: Init CurrentTSO (in schedule) = %p", 
730                 CurrentTSO);
731     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
732     
733     if (RtsFlags.GranFlags.Light) {
734         /* Save current time; GranSim Light only */
735         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
736     }      
737 #endif
738 }
739
740 /* -----------------------------------------------------------------------------
741  * schedulePushWork()
742  *
743  * Push work to other Capabilities if we have some.
744  * -------------------------------------------------------------------------- */
745
746 #if defined(THREADED_RTS)
747 static void
748 schedulePushWork(Capability *cap USED_IF_THREADS, 
749                  Task *task      USED_IF_THREADS)
750 {
751     Capability *free_caps[n_capabilities], *cap0;
752     nat i, n_free_caps;
753
754     // migration can be turned off with +RTS -qg
755     if (!RtsFlags.ParFlags.migrate) return;
756
757     // Check whether we have more threads on our run queue, or sparks
758     // in our pool, that we could hand to another Capability.
759     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
760         && sparkPoolSizeCap(cap) < 2) {
761         return;
762     }
763
764     // First grab as many free Capabilities as we can.
765     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
766         cap0 = &capabilities[i];
767         if (cap != cap0 && tryGrabCapability(cap0,task)) {
768             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
769                 // it already has some work, we just grabbed it at 
770                 // the wrong moment.  Or maybe it's deadlocked!
771                 releaseCapability(cap0);
772             } else {
773                 free_caps[n_free_caps++] = cap0;
774             }
775         }
776     }
777
778     // we now have n_free_caps free capabilities stashed in
779     // free_caps[].  Share our run queue equally with them.  This is
780     // probably the simplest thing we could do; improvements we might
781     // want to do include:
782     //
783     //   - giving high priority to moving relatively new threads, on 
784     //     the gournds that they haven't had time to build up a
785     //     working set in the cache on this CPU/Capability.
786     //
787     //   - giving low priority to moving long-lived threads
788
789     if (n_free_caps > 0) {
790         StgTSO *prev, *t, *next;
791         rtsBool pushed_to_all;
792
793         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
794
795         i = 0;
796         pushed_to_all = rtsFalse;
797
798         if (cap->run_queue_hd != END_TSO_QUEUE) {
799             prev = cap->run_queue_hd;
800             t = prev->link;
801             prev->link = END_TSO_QUEUE;
802             for (; t != END_TSO_QUEUE; t = next) {
803                 next = t->link;
804                 t->link = END_TSO_QUEUE;
805                 if (t->what_next == ThreadRelocated
806                     || t->bound == task // don't move my bound thread
807                     || tsoLocked(t)) {  // don't move a locked thread
808                     prev->link = t;
809                     prev = t;
810                 } else if (i == n_free_caps) {
811                     pushed_to_all = rtsTrue;
812                     i = 0;
813                     // keep one for us
814                     prev->link = t;
815                     prev = t;
816                 } else {
817                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
818                     appendToRunQueue(free_caps[i],t);
819                     if (t->bound) { t->bound->cap = free_caps[i]; }
820                     t->cap = free_caps[i];
821                     i++;
822                 }
823             }
824             cap->run_queue_tl = prev;
825         }
826
827         // If there are some free capabilities that we didn't push any
828         // threads to, then try to push a spark to each one.
829         if (!pushed_to_all) {
830             StgClosure *spark;
831             // i is the next free capability to push to
832             for (; i < n_free_caps; i++) {
833                 if (emptySparkPoolCap(free_caps[i])) {
834                     spark = findSpark(cap);
835                     if (spark != NULL) {
836                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
837                         newSpark(&(free_caps[i]->r), spark);
838                     }
839                 }
840             }
841         }
842
843         // release the capabilities
844         for (i = 0; i < n_free_caps; i++) {
845             task->cap = free_caps[i];
846             releaseCapability(free_caps[i]);
847         }
848     }
849     task->cap = cap; // reset to point to our Capability.
850 }
851 #endif
852
853 /* ----------------------------------------------------------------------------
854  * Start any pending signal handlers
855  * ------------------------------------------------------------------------- */
856
857 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
858 static void
859 scheduleStartSignalHandlers(Capability *cap)
860 {
861     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
862         // safe outside the lock
863         startSignalHandlers(cap);
864     }
865 }
866 #else
867 static void
868 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
869 {
870 }
871 #endif
872
873 /* ----------------------------------------------------------------------------
874  * Check for blocked threads that can be woken up.
875  * ------------------------------------------------------------------------- */
876
877 static void
878 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
879 {
880 #if !defined(THREADED_RTS)
881     //
882     // Check whether any waiting threads need to be woken up.  If the
883     // run queue is empty, and there are no other tasks running, we
884     // can wait indefinitely for something to happen.
885     //
886     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
887     {
888         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
889     }
890 #endif
891 }
892
893
894 /* ----------------------------------------------------------------------------
895  * Check for threads woken up by other Capabilities
896  * ------------------------------------------------------------------------- */
897
898 static void
899 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
900 {
901 #if defined(THREADED_RTS)
902     // Any threads that were woken up by other Capabilities get
903     // appended to our run queue.
904     if (!emptyWakeupQueue(cap)) {
905         ACQUIRE_LOCK(&cap->lock);
906         if (emptyRunQueue(cap)) {
907             cap->run_queue_hd = cap->wakeup_queue_hd;
908             cap->run_queue_tl = cap->wakeup_queue_tl;
909         } else {
910             cap->run_queue_tl->link = cap->wakeup_queue_hd;
911             cap->run_queue_tl = cap->wakeup_queue_tl;
912         }
913         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
914         RELEASE_LOCK(&cap->lock);
915     }
916 #endif
917 }
918
919 /* ----------------------------------------------------------------------------
920  * Check for threads blocked on BLACKHOLEs that can be woken up
921  * ------------------------------------------------------------------------- */
922 static void
923 scheduleCheckBlackHoles (Capability *cap)
924 {
925     if ( blackholes_need_checking ) // check without the lock first
926     {
927         ACQUIRE_LOCK(&sched_mutex);
928         if ( blackholes_need_checking ) {
929             checkBlackHoles(cap);
930             blackholes_need_checking = rtsFalse;
931         }
932         RELEASE_LOCK(&sched_mutex);
933     }
934 }
935
936 /* ----------------------------------------------------------------------------
937  * Detect deadlock conditions and attempt to resolve them.
938  * ------------------------------------------------------------------------- */
939
940 static void
941 scheduleDetectDeadlock (Capability *cap, Task *task)
942 {
943
944 #if defined(PARALLEL_HASKELL)
945     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
946     return;
947 #endif
948
949     /* 
950      * Detect deadlock: when we have no threads to run, there are no
951      * threads blocked, waiting for I/O, or sleeping, and all the
952      * other tasks are waiting for work, we must have a deadlock of
953      * some description.
954      */
955     if ( emptyThreadQueues(cap) )
956     {
957 #if defined(THREADED_RTS)
958         /* 
959          * In the threaded RTS, we only check for deadlock if there
960          * has been no activity in a complete timeslice.  This means
961          * we won't eagerly start a full GC just because we don't have
962          * any threads to run currently.
963          */
964         if (recent_activity != ACTIVITY_INACTIVE) return;
965 #endif
966
967         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
968
969         // Garbage collection can release some new threads due to
970         // either (a) finalizers or (b) threads resurrected because
971         // they are unreachable and will therefore be sent an
972         // exception.  Any threads thus released will be immediately
973         // runnable.
974         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
975
976         recent_activity = ACTIVITY_DONE_GC;
977         
978         if ( !emptyRunQueue(cap) ) return;
979
980 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
981         /* If we have user-installed signal handlers, then wait
982          * for signals to arrive rather then bombing out with a
983          * deadlock.
984          */
985         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
986             debugTrace(DEBUG_sched,
987                        "still deadlocked, waiting for signals...");
988
989             awaitUserSignals();
990
991             if (signals_pending()) {
992                 startSignalHandlers(cap);
993             }
994
995             // either we have threads to run, or we were interrupted:
996             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
997         }
998 #endif
999
1000 #if !defined(THREADED_RTS)
1001         /* Probably a real deadlock.  Send the current main thread the
1002          * Deadlock exception.
1003          */
1004         if (task->tso) {
1005             switch (task->tso->why_blocked) {
1006             case BlockedOnSTM:
1007             case BlockedOnBlackHole:
1008             case BlockedOnException:
1009             case BlockedOnMVar:
1010                 throwToSingleThreaded(cap, task->tso, 
1011                                       (StgClosure *)NonTermination_closure);
1012                 return;
1013             default:
1014                 barf("deadlock: main thread blocked in a strange way");
1015             }
1016         }
1017         return;
1018 #endif
1019     }
1020 }
1021
1022 /* ----------------------------------------------------------------------------
1023  * Process an event (GRAN only)
1024  * ------------------------------------------------------------------------- */
1025
1026 #if defined(GRAN)
1027 static StgTSO *
1028 scheduleProcessEvent(rtsEvent *event)
1029 {
1030     StgTSO *t;
1031
1032     if (RtsFlags.GranFlags.Light)
1033       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1034
1035     /* adjust time based on time-stamp */
1036     if (event->time > CurrentTime[CurrentProc] &&
1037         event->evttype != ContinueThread)
1038       CurrentTime[CurrentProc] = event->time;
1039     
1040     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1041     if (!RtsFlags.GranFlags.Light)
1042       handleIdlePEs();
1043
1044     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1045
1046     /* main event dispatcher in GranSim */
1047     switch (event->evttype) {
1048       /* Should just be continuing execution */
1049     case ContinueThread:
1050       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1051       /* ToDo: check assertion
1052       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1053              run_queue_hd != END_TSO_QUEUE);
1054       */
1055       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1056       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1057           procStatus[CurrentProc]==Fetching) {
1058         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1059               CurrentTSO->id, CurrentTSO, CurrentProc);
1060         goto next_thread;
1061       } 
1062       /* Ignore ContinueThreads for completed threads */
1063       if (CurrentTSO->what_next == ThreadComplete) {
1064         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1065               CurrentTSO->id, CurrentTSO, CurrentProc);
1066         goto next_thread;
1067       } 
1068       /* Ignore ContinueThreads for threads that are being migrated */
1069       if (PROCS(CurrentTSO)==Nowhere) { 
1070         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1071               CurrentTSO->id, CurrentTSO, CurrentProc);
1072         goto next_thread;
1073       }
1074       /* The thread should be at the beginning of the run queue */
1075       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1076         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1077               CurrentTSO->id, CurrentTSO, CurrentProc);
1078         break; // run the thread anyway
1079       }
1080       /*
1081       new_event(proc, proc, CurrentTime[proc],
1082                 FindWork,
1083                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1084       goto next_thread; 
1085       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1086       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1087
1088     case FetchNode:
1089       do_the_fetchnode(event);
1090       goto next_thread;             /* handle next event in event queue  */
1091       
1092     case GlobalBlock:
1093       do_the_globalblock(event);
1094       goto next_thread;             /* handle next event in event queue  */
1095       
1096     case FetchReply:
1097       do_the_fetchreply(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case UnblockThread:   /* Move from the blocked queue to the tail of */
1101       do_the_unblock(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case ResumeThread:  /* Move from the blocked queue to the tail of */
1105       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1106       event->tso->gran.blocktime += 
1107         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1108       do_the_startthread(event);
1109       goto next_thread;             /* handle next event in event queue  */
1110       
1111     case StartThread:
1112       do_the_startthread(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case MoveThread:
1116       do_the_movethread(event);
1117       goto next_thread;             /* handle next event in event queue  */
1118       
1119     case MoveSpark:
1120       do_the_movespark(event);
1121       goto next_thread;             /* handle next event in event queue  */
1122       
1123     case FindWork:
1124       do_the_findwork(event);
1125       goto next_thread;             /* handle next event in event queue  */
1126       
1127     default:
1128       barf("Illegal event type %u\n", event->evttype);
1129     }  /* switch */
1130     
1131     /* This point was scheduler_loop in the old RTS */
1132
1133     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1134
1135     TimeOfLastEvent = CurrentTime[CurrentProc];
1136     TimeOfNextEvent = get_time_of_next_event();
1137     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1138     // CurrentTSO = ThreadQueueHd;
1139
1140     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1141                          TimeOfNextEvent));
1142
1143     if (RtsFlags.GranFlags.Light) 
1144       GranSimLight_leave_system(event, &ActiveTSO); 
1145
1146     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1147
1148     IF_DEBUG(gran, 
1149              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1150
1151     /* in a GranSim setup the TSO stays on the run queue */
1152     t = CurrentTSO;
1153     /* Take a thread from the run queue. */
1154     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1155
1156     IF_DEBUG(gran, 
1157              debugBelch("GRAN: About to run current thread, which is\n");
1158              G_TSO(t,5));
1159
1160     context_switch = 0; // turned on via GranYield, checking events and time slice
1161
1162     IF_DEBUG(gran, 
1163              DumpGranEvent(GR_SCHEDULE, t));
1164
1165     procStatus[CurrentProc] = Busy;
1166 }
1167 #endif // GRAN
1168
1169 /* ----------------------------------------------------------------------------
1170  * Send pending messages (PARALLEL_HASKELL only)
1171  * ------------------------------------------------------------------------- */
1172
1173 #if defined(PARALLEL_HASKELL)
1174 static StgTSO *
1175 scheduleSendPendingMessages(void)
1176 {
1177     StgSparkPool *pool;
1178     rtsSpark spark;
1179     StgTSO *t;
1180
1181 # if defined(PAR) // global Mem.Mgmt., omit for now
1182     if (PendingFetches != END_BF_QUEUE) {
1183         processFetches();
1184     }
1185 # endif
1186     
1187     if (RtsFlags.ParFlags.BufferTime) {
1188         // if we use message buffering, we must send away all message
1189         // packets which have become too old...
1190         sendOldBuffers(); 
1191     }
1192 }
1193 #endif
1194
1195 /* ----------------------------------------------------------------------------
1196  * Activate spark threads (PARALLEL_HASKELL only)
1197  * ------------------------------------------------------------------------- */
1198
1199 #if defined(PARALLEL_HASKELL)
1200 static void
1201 scheduleActivateSpark(void)
1202 {
1203 #if defined(SPARKS)
1204   ASSERT(emptyRunQueue());
1205 /* We get here if the run queue is empty and want some work.
1206    We try to turn a spark into a thread, and add it to the run queue,
1207    from where it will be picked up in the next iteration of the scheduler
1208    loop.
1209 */
1210
1211       /* :-[  no local threads => look out for local sparks */
1212       /* the spark pool for the current PE */
1213       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1214       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1215           pool->hd < pool->tl) {
1216         /* 
1217          * ToDo: add GC code check that we really have enough heap afterwards!!
1218          * Old comment:
1219          * If we're here (no runnable threads) and we have pending
1220          * sparks, we must have a space problem.  Get enough space
1221          * to turn one of those pending sparks into a
1222          * thread... 
1223          */
1224
1225         spark = findSpark(rtsFalse);            /* get a spark */
1226         if (spark != (rtsSpark) NULL) {
1227           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1228           IF_PAR_DEBUG(fish, // schedule,
1229                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1230                              tso->id, tso, advisory_thread_count));
1231
1232           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1233             IF_PAR_DEBUG(fish, // schedule,
1234                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1235                             spark));
1236             return rtsFalse; /* failed to generate a thread */
1237           }                  /* otherwise fall through & pick-up new tso */
1238         } else {
1239           IF_PAR_DEBUG(fish, // schedule,
1240                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1241                              spark_queue_len(pool)));
1242           return rtsFalse;  /* failed to generate a thread */
1243         }
1244         return rtsTrue;  /* success in generating a thread */
1245   } else { /* no more threads permitted or pool empty */
1246     return rtsFalse;  /* failed to generateThread */
1247   }
1248 #else
1249   tso = NULL; // avoid compiler warning only
1250   return rtsFalse;  /* dummy in non-PAR setup */
1251 #endif // SPARKS
1252 }
1253 #endif // PARALLEL_HASKELL
1254
1255 /* ----------------------------------------------------------------------------
1256  * Get work from a remote node (PARALLEL_HASKELL only)
1257  * ------------------------------------------------------------------------- */
1258     
1259 #if defined(PARALLEL_HASKELL)
1260 static rtsBool
1261 scheduleGetRemoteWork(rtsBool *receivedFinish)
1262 {
1263   ASSERT(emptyRunQueue());
1264
1265   if (RtsFlags.ParFlags.BufferTime) {
1266         IF_PAR_DEBUG(verbose, 
1267                 debugBelch("...send all pending data,"));
1268         {
1269           nat i;
1270           for (i=1; i<=nPEs; i++)
1271             sendImmediately(i); // send all messages away immediately
1272         }
1273   }
1274 # ifndef SPARKS
1275         //++EDEN++ idle() , i.e. send all buffers, wait for work
1276         // suppress fishing in EDEN... just look for incoming messages
1277         // (blocking receive)
1278   IF_PAR_DEBUG(verbose, 
1279                debugBelch("...wait for incoming messages...\n"));
1280   *receivedFinish = processMessages(); // blocking receive...
1281
1282         // and reenter scheduling loop after having received something
1283         // (return rtsFalse below)
1284
1285 # else /* activate SPARKS machinery */
1286 /* We get here, if we have no work, tried to activate a local spark, but still
1287    have no work. We try to get a remote spark, by sending a FISH message.
1288    Thread migration should be added here, and triggered when a sequence of 
1289    fishes returns without work. */
1290         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1291
1292       /* =8-[  no local sparks => look for work on other PEs */
1293         /*
1294          * We really have absolutely no work.  Send out a fish
1295          * (there may be some out there already), and wait for
1296          * something to arrive.  We clearly can't run any threads
1297          * until a SCHEDULE or RESUME arrives, and so that's what
1298          * we're hoping to see.  (Of course, we still have to
1299          * respond to other types of messages.)
1300          */
1301         rtsTime now = msTime() /*CURRENT_TIME*/;
1302         IF_PAR_DEBUG(verbose, 
1303                      debugBelch("--  now=%ld\n", now));
1304         IF_PAR_DEBUG(fish, // verbose,
1305              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1306                  (last_fish_arrived_at!=0 &&
1307                   last_fish_arrived_at+delay > now)) {
1308                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1309                      now, last_fish_arrived_at+delay, 
1310                      last_fish_arrived_at,
1311                      delay);
1312              });
1313   
1314         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1315             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1316           if (last_fish_arrived_at==0 ||
1317               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1318             /* outstandingFishes is set in sendFish, processFish;
1319                avoid flooding system with fishes via delay */
1320     next_fish_to_send_at = 0;  
1321   } else {
1322     /* ToDo: this should be done in the main scheduling loop to avoid the
1323              busy wait here; not so bad if fish delay is very small  */
1324     int iq = 0; // DEBUGGING -- HWL
1325     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1326     /* send a fish when ready, but process messages that arrive in the meantime */
1327     do {
1328       if (PacketsWaiting()) {
1329         iq++; // DEBUGGING
1330         *receivedFinish = processMessages();
1331       }
1332       now = msTime();
1333     } while (!*receivedFinish || now<next_fish_to_send_at);
1334     // JB: This means the fish could become obsolete, if we receive
1335     // work. Better check for work again? 
1336     // last line: while (!receivedFinish || !haveWork || now<...)
1337     // next line: if (receivedFinish || haveWork )
1338
1339     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1340       return rtsFalse;  // NB: this will leave scheduler loop
1341                         // immediately after return!
1342                           
1343     IF_PAR_DEBUG(fish, // verbose,
1344                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1345
1346   }
1347
1348     // JB: IMHO, this should all be hidden inside sendFish(...)
1349     /* pe = choosePE(); 
1350        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1351                 NEW_FISH_HUNGER);
1352
1353     // Global statistics: count no. of fishes
1354     if (RtsFlags.ParFlags.ParStats.Global &&
1355          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1356            globalParStats.tot_fish_mess++;
1357            }
1358     */ 
1359
1360   /* delayed fishes must have been sent by now! */
1361   next_fish_to_send_at = 0;  
1362   }
1363       
1364   *receivedFinish = processMessages();
1365 # endif /* SPARKS */
1366
1367  return rtsFalse;
1368  /* NB: this function always returns rtsFalse, meaning the scheduler
1369     loop continues with the next iteration; 
1370     rationale: 
1371       return code means success in finding work; we enter this function
1372       if there is no local work, thus have to send a fish which takes
1373       time until it arrives with work; in the meantime we should process
1374       messages in the main loop;
1375  */
1376 }
1377 #endif // PARALLEL_HASKELL
1378
1379 /* ----------------------------------------------------------------------------
1380  * PAR/GRAN: Report stats & debugging info(?)
1381  * ------------------------------------------------------------------------- */
1382
1383 #if defined(PAR) || defined(GRAN)
1384 static void
1385 scheduleGranParReport(void)
1386 {
1387   ASSERT(run_queue_hd != END_TSO_QUEUE);
1388
1389   /* Take a thread from the run queue, if we have work */
1390   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1391
1392     /* If this TSO has got its outport closed in the meantime, 
1393      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1394      * It has to be marked as TH_DEAD for this purpose.
1395      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1396
1397 JB: TODO: investigate wether state change field could be nuked
1398      entirely and replaced by the normal tso state (whatnext
1399      field). All we want to do is to kill tsos from outside.
1400      */
1401
1402     /* ToDo: write something to the log-file
1403     if (RTSflags.ParFlags.granSimStats && !sameThread)
1404         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1405
1406     CurrentTSO = t;
1407     */
1408     /* the spark pool for the current PE */
1409     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1410
1411     IF_DEBUG(scheduler, 
1412              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1413                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1414
1415     IF_PAR_DEBUG(fish,
1416              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1417                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1418
1419     if (RtsFlags.ParFlags.ParStats.Full && 
1420         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1421         (emitSchedule || // forced emit
1422          (t && LastTSO && t->id != LastTSO->id))) {
1423       /* 
1424          we are running a different TSO, so write a schedule event to log file
1425          NB: If we use fair scheduling we also have to write  a deschedule 
1426              event for LastTSO; with unfair scheduling we know that the
1427              previous tso has blocked whenever we switch to another tso, so
1428              we don't need it in GUM for now
1429       */
1430       IF_PAR_DEBUG(fish, // schedule,
1431                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1432
1433       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1434                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1435       emitSchedule = rtsFalse;
1436     }
1437 }     
1438 #endif
1439
1440 /* ----------------------------------------------------------------------------
1441  * After running a thread...
1442  * ------------------------------------------------------------------------- */
1443
1444 static void
1445 schedulePostRunThread(void)
1446 {
1447 #if defined(PAR)
1448     /* HACK 675: if the last thread didn't yield, make sure to print a 
1449        SCHEDULE event to the log file when StgRunning the next thread, even
1450        if it is the same one as before */
1451     LastTSO = t; 
1452     TimeOfLastYield = CURRENT_TIME;
1453 #endif
1454
1455   /* some statistics gathering in the parallel case */
1456
1457 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1458   switch (ret) {
1459     case HeapOverflow:
1460 # if defined(GRAN)
1461       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1462       globalGranStats.tot_heapover++;
1463 # elif defined(PAR)
1464       globalParStats.tot_heapover++;
1465 # endif
1466       break;
1467
1468      case StackOverflow:
1469 # if defined(GRAN)
1470       IF_DEBUG(gran, 
1471                DumpGranEvent(GR_DESCHEDULE, t));
1472       globalGranStats.tot_stackover++;
1473 # elif defined(PAR)
1474       // IF_DEBUG(par, 
1475       // DumpGranEvent(GR_DESCHEDULE, t);
1476       globalParStats.tot_stackover++;
1477 # endif
1478       break;
1479
1480     case ThreadYielding:
1481 # if defined(GRAN)
1482       IF_DEBUG(gran, 
1483                DumpGranEvent(GR_DESCHEDULE, t));
1484       globalGranStats.tot_yields++;
1485 # elif defined(PAR)
1486       // IF_DEBUG(par, 
1487       // DumpGranEvent(GR_DESCHEDULE, t);
1488       globalParStats.tot_yields++;
1489 # endif
1490       break; 
1491
1492     case ThreadBlocked:
1493 # if defined(GRAN)
1494         debugTrace(DEBUG_sched, 
1495                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1496                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1497                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1498                if (t->block_info.closure!=(StgClosure*)NULL)
1499                  print_bq(t->block_info.closure);
1500                debugBelch("\n"));
1501
1502       // ??? needed; should emit block before
1503       IF_DEBUG(gran, 
1504                DumpGranEvent(GR_DESCHEDULE, t)); 
1505       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1506       /*
1507         ngoq Dogh!
1508       ASSERT(procStatus[CurrentProc]==Busy || 
1509               ((procStatus[CurrentProc]==Fetching) && 
1510               (t->block_info.closure!=(StgClosure*)NULL)));
1511       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1512           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1513             procStatus[CurrentProc]==Fetching)) 
1514         procStatus[CurrentProc] = Idle;
1515       */
1516 # elif defined(PAR)
1517 //++PAR++  blockThread() writes the event (change?)
1518 # endif
1519     break;
1520
1521   case ThreadFinished:
1522     break;
1523
1524   default:
1525     barf("parGlobalStats: unknown return code");
1526     break;
1527     }
1528 #endif
1529 }
1530
1531 /* -----------------------------------------------------------------------------
1532  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1533  * -------------------------------------------------------------------------- */
1534
1535 static rtsBool
1536 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1537 {
1538     // did the task ask for a large block?
1539     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1540         // if so, get one and push it on the front of the nursery.
1541         bdescr *bd;
1542         lnat blocks;
1543         
1544         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1545         
1546         debugTrace(DEBUG_sched,
1547                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1548                    (long)t->id, whatNext_strs[t->what_next], blocks);
1549     
1550         // don't do this if the nursery is (nearly) full, we'll GC first.
1551         if (cap->r.rCurrentNursery->link != NULL ||
1552             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1553                                                // if the nursery has only one block.
1554             
1555             ACQUIRE_SM_LOCK
1556             bd = allocGroup( blocks );
1557             RELEASE_SM_LOCK
1558             cap->r.rNursery->n_blocks += blocks;
1559             
1560             // link the new group into the list
1561             bd->link = cap->r.rCurrentNursery;
1562             bd->u.back = cap->r.rCurrentNursery->u.back;
1563             if (cap->r.rCurrentNursery->u.back != NULL) {
1564                 cap->r.rCurrentNursery->u.back->link = bd;
1565             } else {
1566 #if !defined(THREADED_RTS)
1567                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1568                        g0s0 == cap->r.rNursery);
1569 #endif
1570                 cap->r.rNursery->blocks = bd;
1571             }             
1572             cap->r.rCurrentNursery->u.back = bd;
1573             
1574             // initialise it as a nursery block.  We initialise the
1575             // step, gen_no, and flags field of *every* sub-block in
1576             // this large block, because this is easier than making
1577             // sure that we always find the block head of a large
1578             // block whenever we call Bdescr() (eg. evacuate() and
1579             // isAlive() in the GC would both have to do this, at
1580             // least).
1581             { 
1582                 bdescr *x;
1583                 for (x = bd; x < bd + blocks; x++) {
1584                     x->step = cap->r.rNursery;
1585                     x->gen_no = 0;
1586                     x->flags = 0;
1587                 }
1588             }
1589             
1590             // This assert can be a killer if the app is doing lots
1591             // of large block allocations.
1592             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1593             
1594             // now update the nursery to point to the new block
1595             cap->r.rCurrentNursery = bd;
1596             
1597             // we might be unlucky and have another thread get on the
1598             // run queue before us and steal the large block, but in that
1599             // case the thread will just end up requesting another large
1600             // block.
1601             pushOnRunQueue(cap,t);
1602             return rtsFalse;  /* not actually GC'ing */
1603         }
1604     }
1605     
1606     debugTrace(DEBUG_sched,
1607                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1608                (long)t->id, whatNext_strs[t->what_next]);
1609
1610 #if defined(GRAN)
1611     ASSERT(!is_on_queue(t,CurrentProc));
1612 #elif defined(PARALLEL_HASKELL)
1613     /* Currently we emit a DESCHEDULE event before GC in GUM.
1614        ToDo: either add separate event to distinguish SYSTEM time from rest
1615        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1616     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1617         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1618                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1619         emitSchedule = rtsTrue;
1620     }
1621 #endif
1622       
1623     pushOnRunQueue(cap,t);
1624     return rtsTrue;
1625     /* actual GC is done at the end of the while loop in schedule() */
1626 }
1627
1628 /* -----------------------------------------------------------------------------
1629  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1630  * -------------------------------------------------------------------------- */
1631
1632 static void
1633 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1634 {
1635     debugTrace (DEBUG_sched,
1636                 "--<< thread %ld (%s) stopped, StackOverflow", 
1637                 (long)t->id, whatNext_strs[t->what_next]);
1638
1639     /* just adjust the stack for this thread, then pop it back
1640      * on the run queue.
1641      */
1642     { 
1643         /* enlarge the stack */
1644         StgTSO *new_t = threadStackOverflow(cap, t);
1645         
1646         /* The TSO attached to this Task may have moved, so update the
1647          * pointer to it.
1648          */
1649         if (task->tso == t) {
1650             task->tso = new_t;
1651         }
1652         pushOnRunQueue(cap,new_t);
1653     }
1654 }
1655
1656 /* -----------------------------------------------------------------------------
1657  * Handle a thread that returned to the scheduler with ThreadYielding
1658  * -------------------------------------------------------------------------- */
1659
1660 static rtsBool
1661 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1662 {
1663     // Reset the context switch flag.  We don't do this just before
1664     // running the thread, because that would mean we would lose ticks
1665     // during GC, which can lead to unfair scheduling (a thread hogs
1666     // the CPU because the tick always arrives during GC).  This way
1667     // penalises threads that do a lot of allocation, but that seems
1668     // better than the alternative.
1669     context_switch = 0;
1670     
1671     /* put the thread back on the run queue.  Then, if we're ready to
1672      * GC, check whether this is the last task to stop.  If so, wake
1673      * up the GC thread.  getThread will block during a GC until the
1674      * GC is finished.
1675      */
1676 #ifdef DEBUG
1677     if (t->what_next != prev_what_next) {
1678         debugTrace(DEBUG_sched,
1679                    "--<< thread %ld (%s) stopped to switch evaluators", 
1680                    (long)t->id, whatNext_strs[t->what_next]);
1681     } else {
1682         debugTrace(DEBUG_sched,
1683                    "--<< thread %ld (%s) stopped, yielding",
1684                    (long)t->id, whatNext_strs[t->what_next]);
1685     }
1686 #endif
1687     
1688     IF_DEBUG(sanity,
1689              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1690              checkTSO(t));
1691     ASSERT(t->link == END_TSO_QUEUE);
1692     
1693     // Shortcut if we're just switching evaluators: don't bother
1694     // doing stack squeezing (which can be expensive), just run the
1695     // thread.
1696     if (t->what_next != prev_what_next) {
1697         return rtsTrue;
1698     }
1699     
1700 #if defined(GRAN)
1701     ASSERT(!is_on_queue(t,CurrentProc));
1702       
1703     IF_DEBUG(sanity,
1704              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1705              checkThreadQsSanity(rtsTrue));
1706
1707 #endif
1708
1709     addToRunQueue(cap,t);
1710
1711 #if defined(GRAN)
1712     /* add a ContinueThread event to actually process the thread */
1713     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1714               ContinueThread,
1715               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1716     IF_GRAN_DEBUG(bq, 
1717                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1718                   G_EVENTQ(0);
1719                   G_CURR_THREADQ(0));
1720 #endif
1721     return rtsFalse;
1722 }
1723
1724 /* -----------------------------------------------------------------------------
1725  * Handle a thread that returned to the scheduler with ThreadBlocked
1726  * -------------------------------------------------------------------------- */
1727
1728 static void
1729 scheduleHandleThreadBlocked( StgTSO *t
1730 #if !defined(GRAN) && !defined(DEBUG)
1731     STG_UNUSED
1732 #endif
1733     )
1734 {
1735 #if defined(GRAN)
1736     IF_DEBUG(scheduler,
1737              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1738                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1739              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1740     
1741     // ??? needed; should emit block before
1742     IF_DEBUG(gran, 
1743              DumpGranEvent(GR_DESCHEDULE, t)); 
1744     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1745     /*
1746       ngoq Dogh!
1747       ASSERT(procStatus[CurrentProc]==Busy || 
1748       ((procStatus[CurrentProc]==Fetching) && 
1749       (t->block_info.closure!=(StgClosure*)NULL)));
1750       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1751       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1752       procStatus[CurrentProc]==Fetching)) 
1753       procStatus[CurrentProc] = Idle;
1754     */
1755 #elif defined(PAR)
1756     IF_DEBUG(scheduler,
1757              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1758                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1759     IF_PAR_DEBUG(bq,
1760                  
1761                  if (t->block_info.closure!=(StgClosure*)NULL) 
1762                  print_bq(t->block_info.closure));
1763     
1764     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1765     blockThread(t);
1766     
1767     /* whatever we schedule next, we must log that schedule */
1768     emitSchedule = rtsTrue;
1769     
1770 #else /* !GRAN */
1771
1772       // We don't need to do anything.  The thread is blocked, and it
1773       // has tidied up its stack and placed itself on whatever queue
1774       // it needs to be on.
1775
1776     // ASSERT(t->why_blocked != NotBlocked);
1777     // Not true: for example,
1778     //    - in THREADED_RTS, the thread may already have been woken
1779     //      up by another Capability.  This actually happens: try
1780     //      conc023 +RTS -N2.
1781     //    - the thread may have woken itself up already, because
1782     //      threadPaused() might have raised a blocked throwTo
1783     //      exception, see maybePerformBlockedException().
1784
1785 #ifdef DEBUG
1786     if (traceClass(DEBUG_sched)) {
1787         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1788                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1789         printThreadBlockage(t);
1790         debugTraceEnd();
1791     }
1792 #endif
1793     
1794     /* Only for dumping event to log file 
1795        ToDo: do I need this in GranSim, too?
1796        blockThread(t);
1797     */
1798 #endif
1799 }
1800
1801 /* -----------------------------------------------------------------------------
1802  * Handle a thread that returned to the scheduler with ThreadFinished
1803  * -------------------------------------------------------------------------- */
1804
1805 static rtsBool
1806 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1807 {
1808     /* Need to check whether this was a main thread, and if so,
1809      * return with the return value.
1810      *
1811      * We also end up here if the thread kills itself with an
1812      * uncaught exception, see Exception.cmm.
1813      */
1814     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1815                (unsigned long)t->id, whatNext_strs[t->what_next]);
1816
1817 #if defined(GRAN)
1818       endThread(t, CurrentProc); // clean-up the thread
1819 #elif defined(PARALLEL_HASKELL)
1820       /* For now all are advisory -- HWL */
1821       //if(t->priority==AdvisoryPriority) ??
1822       advisory_thread_count--; // JB: Caution with this counter, buggy!
1823       
1824 # if defined(DIST)
1825       if(t->dist.priority==RevalPriority)
1826         FinishReval(t);
1827 # endif
1828     
1829 # if defined(EDENOLD)
1830       // the thread could still have an outport... (BUG)
1831       if (t->eden.outport != -1) {
1832       // delete the outport for the tso which has finished...
1833         IF_PAR_DEBUG(eden_ports,
1834                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1835                               t->eden.outport, t->id));
1836         deleteOPT(t);
1837       }
1838       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1839       if (t->eden.epid != -1) {
1840         IF_PAR_DEBUG(eden_ports,
1841                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1842                            t->id, t->eden.epid));
1843         removeTSOfromProcess(t);
1844       }
1845 # endif 
1846
1847 # if defined(PAR)
1848       if (RtsFlags.ParFlags.ParStats.Full &&
1849           !RtsFlags.ParFlags.ParStats.Suppressed) 
1850         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1851
1852       //  t->par only contains statistics: left out for now...
1853       IF_PAR_DEBUG(fish,
1854                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1855                               t->id,t,t->par.sparkname));
1856 # endif
1857 #endif // PARALLEL_HASKELL
1858
1859       //
1860       // Check whether the thread that just completed was a bound
1861       // thread, and if so return with the result.  
1862       //
1863       // There is an assumption here that all thread completion goes
1864       // through this point; we need to make sure that if a thread
1865       // ends up in the ThreadKilled state, that it stays on the run
1866       // queue so it can be dealt with here.
1867       //
1868
1869       if (t->bound) {
1870
1871           if (t->bound != task) {
1872 #if !defined(THREADED_RTS)
1873               // Must be a bound thread that is not the topmost one.  Leave
1874               // it on the run queue until the stack has unwound to the
1875               // point where we can deal with this.  Leaving it on the run
1876               // queue also ensures that the garbage collector knows about
1877               // this thread and its return value (it gets dropped from the
1878               // all_threads list so there's no other way to find it).
1879               appendToRunQueue(cap,t);
1880               return rtsFalse;
1881 #else
1882               // this cannot happen in the threaded RTS, because a
1883               // bound thread can only be run by the appropriate Task.
1884               barf("finished bound thread that isn't mine");
1885 #endif
1886           }
1887
1888           ASSERT(task->tso == t);
1889
1890           if (t->what_next == ThreadComplete) {
1891               if (task->ret) {
1892                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1893                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1894               }
1895               task->stat = Success;
1896           } else {
1897               if (task->ret) {
1898                   *(task->ret) = NULL;
1899               }
1900               if (sched_state >= SCHED_INTERRUPTING) {
1901                   task->stat = Interrupted;
1902               } else {
1903                   task->stat = Killed;
1904               }
1905           }
1906 #ifdef DEBUG
1907           removeThreadLabel((StgWord)task->tso->id);
1908 #endif
1909           return rtsTrue; // tells schedule() to return
1910       }
1911
1912       return rtsFalse;
1913 }
1914
1915 /* -----------------------------------------------------------------------------
1916  * Perform a heap census
1917  * -------------------------------------------------------------------------- */
1918
1919 static rtsBool
1920 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1921 {
1922     // When we have +RTS -i0 and we're heap profiling, do a census at
1923     // every GC.  This lets us get repeatable runs for debugging.
1924     if (performHeapProfile ||
1925         (RtsFlags.ProfFlags.profileInterval==0 &&
1926          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1927         return rtsTrue;
1928     } else {
1929         return rtsFalse;
1930     }
1931 }
1932
1933 /* -----------------------------------------------------------------------------
1934  * Perform a garbage collection if necessary
1935  * -------------------------------------------------------------------------- */
1936
1937 static Capability *
1938 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1939 {
1940     StgTSO *t;
1941     rtsBool heap_census;
1942 #ifdef THREADED_RTS
1943     static volatile StgWord waiting_for_gc;
1944     rtsBool was_waiting;
1945     nat i;
1946 #endif
1947
1948 #ifdef THREADED_RTS
1949     // In order to GC, there must be no threads running Haskell code.
1950     // Therefore, the GC thread needs to hold *all* the capabilities,
1951     // and release them after the GC has completed.  
1952     //
1953     // This seems to be the simplest way: previous attempts involved
1954     // making all the threads with capabilities give up their
1955     // capabilities and sleep except for the *last* one, which
1956     // actually did the GC.  But it's quite hard to arrange for all
1957     // the other tasks to sleep and stay asleep.
1958     //
1959         
1960     was_waiting = cas(&waiting_for_gc, 0, 1);
1961     if (was_waiting) {
1962         do {
1963             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1964             if (cap) yieldCapability(&cap,task);
1965         } while (waiting_for_gc);
1966         return cap;  // NOTE: task->cap might have changed here
1967     }
1968
1969     for (i=0; i < n_capabilities; i++) {
1970         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1971         if (cap != &capabilities[i]) {
1972             Capability *pcap = &capabilities[i];
1973             // we better hope this task doesn't get migrated to
1974             // another Capability while we're waiting for this one.
1975             // It won't, because load balancing happens while we have
1976             // all the Capabilities, but even so it's a slightly
1977             // unsavoury invariant.
1978             task->cap = pcap;
1979             context_switch = 1;
1980             waitForReturnCapability(&pcap, task);
1981             if (pcap != &capabilities[i]) {
1982                 barf("scheduleDoGC: got the wrong capability");
1983             }
1984         }
1985     }
1986
1987     waiting_for_gc = rtsFalse;
1988 #endif
1989
1990     /* Kick any transactions which are invalid back to their
1991      * atomically frames.  When next scheduled they will try to
1992      * commit, this commit will fail and they will retry.
1993      */
1994     { 
1995         StgTSO *next;
1996
1997         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1998             if (t->what_next == ThreadRelocated) {
1999                 next = t->link;
2000             } else {
2001                 next = t->global_link;
2002                 
2003                 // This is a good place to check for blocked
2004                 // exceptions.  It might be the case that a thread is
2005                 // blocked on delivering an exception to a thread that
2006                 // is also blocked - we try to ensure that this
2007                 // doesn't happen in throwTo(), but it's too hard (or
2008                 // impossible) to close all the race holes, so we
2009                 // accept that some might get through and deal with
2010                 // them here.  A GC will always happen at some point,
2011                 // even if the system is otherwise deadlocked.
2012                 maybePerformBlockedException (&capabilities[0], t);
2013
2014                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2015                     if (!stmValidateNestOfTransactions (t -> trec)) {
2016                         debugTrace(DEBUG_sched | DEBUG_stm,
2017                                    "trec %p found wasting its time", t);
2018                         
2019                         // strip the stack back to the
2020                         // ATOMICALLY_FRAME, aborting the (nested)
2021                         // transaction, and saving the stack of any
2022                         // partially-evaluated thunks on the heap.
2023                         throwToSingleThreaded_(&capabilities[0], t, 
2024                                                NULL, rtsTrue, NULL);
2025                         
2026 #ifdef REG_R1
2027                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2028 #endif
2029                     }
2030                 }
2031             }
2032         }
2033     }
2034     
2035     // so this happens periodically:
2036     if (cap) scheduleCheckBlackHoles(cap);
2037     
2038     IF_DEBUG(scheduler, printAllThreads());
2039
2040     /*
2041      * We now have all the capabilities; if we're in an interrupting
2042      * state, then we should take the opportunity to delete all the
2043      * threads in the system.
2044      */
2045     if (sched_state >= SCHED_INTERRUPTING) {
2046         deleteAllThreads(&capabilities[0]);
2047         sched_state = SCHED_SHUTTING_DOWN;
2048     }
2049     
2050     heap_census = scheduleNeedHeapProfile(rtsTrue);
2051
2052     /* everybody back, start the GC.
2053      * Could do it in this thread, or signal a condition var
2054      * to do it in another thread.  Either way, we need to
2055      * broadcast on gc_pending_cond afterward.
2056      */
2057 #if defined(THREADED_RTS)
2058     debugTrace(DEBUG_sched, "doing GC");
2059 #endif
2060     GarbageCollect(force_major || heap_census);
2061     
2062     if (heap_census) {
2063         debugTrace(DEBUG_sched, "performing heap census");
2064         heapCensus();
2065         performHeapProfile = rtsFalse;
2066     }
2067
2068 #if defined(THREADED_RTS)
2069     // release our stash of capabilities.
2070     for (i = 0; i < n_capabilities; i++) {
2071         if (cap != &capabilities[i]) {
2072             task->cap = &capabilities[i];
2073             releaseCapability(&capabilities[i]);
2074         }
2075     }
2076     if (cap) {
2077         task->cap = cap;
2078     } else {
2079         task->cap = NULL;
2080     }
2081 #endif
2082
2083 #if defined(GRAN)
2084     /* add a ContinueThread event to continue execution of current thread */
2085     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086               ContinueThread,
2087               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088     IF_GRAN_DEBUG(bq, 
2089                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2090                   G_EVENTQ(0);
2091                   G_CURR_THREADQ(0));
2092 #endif /* GRAN */
2093
2094     return cap;
2095 }
2096
2097 /* ---------------------------------------------------------------------------
2098  * Singleton fork(). Do not copy any running threads.
2099  * ------------------------------------------------------------------------- */
2100
2101 pid_t
2102 forkProcess(HsStablePtr *entry
2103 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2104             STG_UNUSED
2105 #endif
2106            )
2107 {
2108 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2109     Task *task;
2110     pid_t pid;
2111     StgTSO* t,*next;
2112     Capability *cap;
2113     
2114 #if defined(THREADED_RTS)
2115     if (RtsFlags.ParFlags.nNodes > 1) {
2116         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2117         stg_exit(EXIT_FAILURE);
2118     }
2119 #endif
2120
2121     debugTrace(DEBUG_sched, "forking!");
2122     
2123     // ToDo: for SMP, we should probably acquire *all* the capabilities
2124     cap = rts_lock();
2125     
2126     pid = fork();
2127     
2128     if (pid) { // parent
2129         
2130         // just return the pid
2131         rts_unlock(cap);
2132         return pid;
2133         
2134     } else { // child
2135         
2136         // Now, all OS threads except the thread that forked are
2137         // stopped.  We need to stop all Haskell threads, including
2138         // those involved in foreign calls.  Also we need to delete
2139         // all Tasks, because they correspond to OS threads that are
2140         // now gone.
2141
2142         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2143             if (t->what_next == ThreadRelocated) {
2144                 next = t->link;
2145             } else {
2146                 next = t->global_link;
2147                 // don't allow threads to catch the ThreadKilled
2148                 // exception, but we do want to raiseAsync() because these
2149                 // threads may be evaluating thunks that we need later.
2150                 deleteThread_(cap,t);
2151             }
2152         }
2153         
2154         // Empty the run queue.  It seems tempting to let all the
2155         // killed threads stay on the run queue as zombies to be
2156         // cleaned up later, but some of them correspond to bound
2157         // threads for which the corresponding Task does not exist.
2158         cap->run_queue_hd = END_TSO_QUEUE;
2159         cap->run_queue_tl = END_TSO_QUEUE;
2160
2161         // Any suspended C-calling Tasks are no more, their OS threads
2162         // don't exist now:
2163         cap->suspended_ccalling_tasks = NULL;
2164
2165         // Empty the all_threads list.  Otherwise, the garbage
2166         // collector may attempt to resurrect some of these threads.
2167         all_threads = END_TSO_QUEUE;
2168
2169         // Wipe the task list, except the current Task.
2170         ACQUIRE_LOCK(&sched_mutex);
2171         for (task = all_tasks; task != NULL; task=task->all_link) {
2172             if (task != cap->running_task) {
2173                 discardTask(task);
2174             }
2175         }
2176         RELEASE_LOCK(&sched_mutex);
2177
2178 #if defined(THREADED_RTS)
2179         // Wipe our spare workers list, they no longer exist.  New
2180         // workers will be created if necessary.
2181         cap->spare_workers = NULL;
2182         cap->returning_tasks_hd = NULL;
2183         cap->returning_tasks_tl = NULL;
2184 #endif
2185
2186         // On Unix, all timers are reset in the child, so we need to start
2187         // the timer again.
2188         startTimer();
2189
2190         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2191         rts_checkSchedStatus("forkProcess",cap);
2192         
2193         rts_unlock(cap);
2194         hs_exit();                      // clean up and exit
2195         stg_exit(EXIT_SUCCESS);
2196     }
2197 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2198     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2199     return -1;
2200 #endif
2201 }
2202
2203 /* ---------------------------------------------------------------------------
2204  * Delete all the threads in the system
2205  * ------------------------------------------------------------------------- */
2206    
2207 static void
2208 deleteAllThreads ( Capability *cap )
2209 {
2210     // NOTE: only safe to call if we own all capabilities.
2211
2212     StgTSO* t, *next;
2213     debugTrace(DEBUG_sched,"deleting all threads");
2214     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2215         if (t->what_next == ThreadRelocated) {
2216             next = t->link;
2217         } else {
2218             next = t->global_link;
2219             deleteThread(cap,t);
2220         }
2221     }      
2222
2223     // The run queue now contains a bunch of ThreadKilled threads.  We
2224     // must not throw these away: the main thread(s) will be in there
2225     // somewhere, and the main scheduler loop has to deal with it.
2226     // Also, the run queue is the only thing keeping these threads from
2227     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2228
2229 #if !defined(THREADED_RTS)
2230     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2231     ASSERT(sleeping_queue == END_TSO_QUEUE);
2232 #endif
2233 }
2234
2235 /* -----------------------------------------------------------------------------
2236    Managing the suspended_ccalling_tasks list.
2237    Locks required: sched_mutex
2238    -------------------------------------------------------------------------- */
2239
2240 STATIC_INLINE void
2241 suspendTask (Capability *cap, Task *task)
2242 {
2243     ASSERT(task->next == NULL && task->prev == NULL);
2244     task->next = cap->suspended_ccalling_tasks;
2245     task->prev = NULL;
2246     if (cap->suspended_ccalling_tasks) {
2247         cap->suspended_ccalling_tasks->prev = task;
2248     }
2249     cap->suspended_ccalling_tasks = task;
2250 }
2251
2252 STATIC_INLINE void
2253 recoverSuspendedTask (Capability *cap, Task *task)
2254 {
2255     if (task->prev) {
2256         task->prev->next = task->next;
2257     } else {
2258         ASSERT(cap->suspended_ccalling_tasks == task);
2259         cap->suspended_ccalling_tasks = task->next;
2260     }
2261     if (task->next) {
2262         task->next->prev = task->prev;
2263     }
2264     task->next = task->prev = NULL;
2265 }
2266
2267 /* ---------------------------------------------------------------------------
2268  * Suspending & resuming Haskell threads.
2269  * 
2270  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2271  * its capability before calling the C function.  This allows another
2272  * task to pick up the capability and carry on running Haskell
2273  * threads.  It also means that if the C call blocks, it won't lock
2274  * the whole system.
2275  *
2276  * The Haskell thread making the C call is put to sleep for the
2277  * duration of the call, on the susepended_ccalling_threads queue.  We
2278  * give out a token to the task, which it can use to resume the thread
2279  * on return from the C function.
2280  * ------------------------------------------------------------------------- */
2281    
2282 void *
2283 suspendThread (StgRegTable *reg)
2284 {
2285   Capability *cap;
2286   int saved_errno;
2287   StgTSO *tso;
2288   Task *task;
2289 #if mingw32_HOST_OS
2290   StgWord32 saved_winerror;
2291 #endif
2292
2293   saved_errno = errno;
2294 #if mingw32_HOST_OS
2295   saved_winerror = GetLastError();
2296 #endif
2297
2298   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2299    */
2300   cap = regTableToCapability(reg);
2301
2302   task = cap->running_task;
2303   tso = cap->r.rCurrentTSO;
2304
2305   debugTrace(DEBUG_sched, 
2306              "thread %lu did a safe foreign call", 
2307              (unsigned long)cap->r.rCurrentTSO->id);
2308
2309   // XXX this might not be necessary --SDM
2310   tso->what_next = ThreadRunGHC;
2311
2312   threadPaused(cap,tso);
2313
2314   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2315       tso->why_blocked = BlockedOnCCall;
2316       tso->flags |= TSO_BLOCKEX;
2317       tso->flags &= ~TSO_INTERRUPTIBLE;
2318   } else {
2319       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2320   }
2321
2322   // Hand back capability
2323   task->suspended_tso = tso;
2324
2325   ACQUIRE_LOCK(&cap->lock);
2326
2327   suspendTask(cap,task);
2328   cap->in_haskell = rtsFalse;
2329   releaseCapability_(cap);
2330   
2331   RELEASE_LOCK(&cap->lock);
2332
2333 #if defined(THREADED_RTS)
2334   /* Preparing to leave the RTS, so ensure there's a native thread/task
2335      waiting to take over.
2336   */
2337   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2338 #endif
2339
2340   errno = saved_errno;
2341 #if mingw32_HOST_OS
2342   SetLastError(saved_winerror);
2343 #endif
2344   return task;
2345 }
2346
2347 StgRegTable *
2348 resumeThread (void *task_)
2349 {
2350     StgTSO *tso;
2351     Capability *cap;
2352     Task *task = task_;
2353     int saved_errno;
2354 #if mingw32_HOST_OS
2355     StgWord32 saved_winerror;
2356 #endif
2357
2358     saved_errno = errno;
2359 #if mingw32_HOST_OS
2360     saved_winerror = GetLastError();
2361 #endif
2362
2363     cap = task->cap;
2364     // Wait for permission to re-enter the RTS with the result.
2365     waitForReturnCapability(&cap,task);
2366     // we might be on a different capability now... but if so, our
2367     // entry on the suspended_ccalling_tasks list will also have been
2368     // migrated.
2369
2370     // Remove the thread from the suspended list
2371     recoverSuspendedTask(cap,task);
2372
2373     tso = task->suspended_tso;
2374     task->suspended_tso = NULL;
2375     tso->link = END_TSO_QUEUE;
2376     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2377     
2378     if (tso->why_blocked == BlockedOnCCall) {
2379         awakenBlockedExceptionQueue(cap,tso);
2380         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2381     }
2382     
2383     /* Reset blocking status */
2384     tso->why_blocked  = NotBlocked;
2385     
2386     cap->r.rCurrentTSO = tso;
2387     cap->in_haskell = rtsTrue;
2388     errno = saved_errno;
2389 #if mingw32_HOST_OS
2390     SetLastError(saved_winerror);
2391 #endif
2392
2393     /* We might have GC'd, mark the TSO dirty again */
2394     dirtyTSO(tso);
2395
2396     IF_DEBUG(sanity, checkTSO(tso));
2397
2398     return &cap->r;
2399 }
2400
2401 /* ---------------------------------------------------------------------------
2402  * scheduleThread()
2403  *
2404  * scheduleThread puts a thread on the end  of the runnable queue.
2405  * This will usually be done immediately after a thread is created.
2406  * The caller of scheduleThread must create the thread using e.g.
2407  * createThread and push an appropriate closure
2408  * on this thread's stack before the scheduler is invoked.
2409  * ------------------------------------------------------------------------ */
2410
2411 void
2412 scheduleThread(Capability *cap, StgTSO *tso)
2413 {
2414     // The thread goes at the *end* of the run-queue, to avoid possible
2415     // starvation of any threads already on the queue.
2416     appendToRunQueue(cap,tso);
2417 }
2418
2419 void
2420 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2421 {
2422 #if defined(THREADED_RTS)
2423     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2424                               // move this thread from now on.
2425     cpu %= RtsFlags.ParFlags.nNodes;
2426     if (cpu == cap->no) {
2427         appendToRunQueue(cap,tso);
2428     } else {
2429         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2430     }
2431 #else
2432     appendToRunQueue(cap,tso);
2433 #endif
2434 }
2435
2436 Capability *
2437 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2438 {
2439     Task *task;
2440
2441     // We already created/initialised the Task
2442     task = cap->running_task;
2443
2444     // This TSO is now a bound thread; make the Task and TSO
2445     // point to each other.
2446     tso->bound = task;
2447     tso->cap = cap;
2448
2449     task->tso = tso;
2450     task->ret = ret;
2451     task->stat = NoStatus;
2452
2453     appendToRunQueue(cap,tso);
2454
2455     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2456
2457 #if defined(GRAN)
2458     /* GranSim specific init */
2459     CurrentTSO = m->tso;                // the TSO to run
2460     procStatus[MainProc] = Busy;        // status of main PE
2461     CurrentProc = MainProc;             // PE to run it on
2462 #endif
2463
2464     cap = schedule(cap,task);
2465
2466     ASSERT(task->stat != NoStatus);
2467     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2468
2469     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2470     return cap;
2471 }
2472
2473 /* ----------------------------------------------------------------------------
2474  * Starting Tasks
2475  * ------------------------------------------------------------------------- */
2476
2477 #if defined(THREADED_RTS)
2478 void
2479 workerStart(Task *task)
2480 {
2481     Capability *cap;
2482
2483     // See startWorkerTask().
2484     ACQUIRE_LOCK(&task->lock);
2485     cap = task->cap;
2486     RELEASE_LOCK(&task->lock);
2487
2488     // set the thread-local pointer to the Task:
2489     taskEnter(task);
2490
2491     // schedule() runs without a lock.
2492     cap = schedule(cap,task);
2493
2494     // On exit from schedule(), we have a Capability.
2495     releaseCapability(cap);
2496     workerTaskStop(task);
2497 }
2498 #endif
2499
2500 /* ---------------------------------------------------------------------------
2501  * initScheduler()
2502  *
2503  * Initialise the scheduler.  This resets all the queues - if the
2504  * queues contained any threads, they'll be garbage collected at the
2505  * next pass.
2506  *
2507  * ------------------------------------------------------------------------ */
2508
2509 void 
2510 initScheduler(void)
2511 {
2512 #if defined(GRAN)
2513   nat i;
2514   for (i=0; i<=MAX_PROC; i++) {
2515     run_queue_hds[i]      = END_TSO_QUEUE;
2516     run_queue_tls[i]      = END_TSO_QUEUE;
2517     blocked_queue_hds[i]  = END_TSO_QUEUE;
2518     blocked_queue_tls[i]  = END_TSO_QUEUE;
2519     ccalling_threadss[i]  = END_TSO_QUEUE;
2520     blackhole_queue[i]    = END_TSO_QUEUE;
2521     sleeping_queue        = END_TSO_QUEUE;
2522   }
2523 #elif !defined(THREADED_RTS)
2524   blocked_queue_hd  = END_TSO_QUEUE;
2525   blocked_queue_tl  = END_TSO_QUEUE;
2526   sleeping_queue    = END_TSO_QUEUE;
2527 #endif
2528
2529   blackhole_queue   = END_TSO_QUEUE;
2530   all_threads       = END_TSO_QUEUE;
2531
2532   context_switch = 0;
2533   sched_state    = SCHED_RUNNING;
2534
2535 #if defined(THREADED_RTS)
2536   /* Initialise the mutex and condition variables used by
2537    * the scheduler. */
2538   initMutex(&sched_mutex);
2539 #endif
2540   
2541   ACQUIRE_LOCK(&sched_mutex);
2542
2543   /* A capability holds the state a native thread needs in
2544    * order to execute STG code. At least one capability is
2545    * floating around (only THREADED_RTS builds have more than one).
2546    */
2547   initCapabilities();
2548
2549   initTaskManager();
2550
2551 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2552   initSparkPools();
2553 #endif
2554
2555 #if defined(THREADED_RTS)
2556   /*
2557    * Eagerly start one worker to run each Capability, except for
2558    * Capability 0.  The idea is that we're probably going to start a
2559    * bound thread on Capability 0 pretty soon, so we don't want a
2560    * worker task hogging it.
2561    */
2562   { 
2563       nat i;
2564       Capability *cap;
2565       for (i = 1; i < n_capabilities; i++) {
2566           cap = &capabilities[i];
2567           ACQUIRE_LOCK(&cap->lock);
2568           startWorkerTask(cap, workerStart);
2569           RELEASE_LOCK(&cap->lock);
2570       }
2571   }
2572 #endif
2573
2574   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2575
2576   RELEASE_LOCK(&sched_mutex);
2577 }
2578
2579 void
2580 exitScheduler( rtsBool wait_foreign )
2581                /* see Capability.c, shutdownCapability() */
2582 {
2583     Task *task = NULL;
2584
2585 #if defined(THREADED_RTS)
2586     ACQUIRE_LOCK(&sched_mutex);
2587     task = newBoundTask();
2588     RELEASE_LOCK(&sched_mutex);
2589 #endif
2590
2591     // If we haven't killed all the threads yet, do it now.
2592     if (sched_state < SCHED_SHUTTING_DOWN) {
2593         sched_state = SCHED_INTERRUPTING;
2594         scheduleDoGC(NULL,task,rtsFalse);    
2595     }
2596     sched_state = SCHED_SHUTTING_DOWN;
2597
2598 #if defined(THREADED_RTS)
2599     { 
2600         nat i;
2601         
2602         for (i = 0; i < n_capabilities; i++) {
2603             shutdownCapability(&capabilities[i], task, wait_foreign);
2604         }
2605         boundTaskExiting(task);
2606         stopTaskManager();
2607     }
2608 #else
2609     freeCapability(&MainCapability);
2610 #endif
2611 }
2612
2613 void
2614 freeScheduler( void )
2615 {
2616     freeTaskManager();
2617     if (n_capabilities != 1) {
2618         stgFree(capabilities);
2619     }
2620 #if defined(THREADED_RTS)
2621     closeMutex(&sched_mutex);
2622 #endif
2623 }
2624
2625 /* ---------------------------------------------------------------------------
2626    Where are the roots that we know about?
2627
2628         - all the threads on the runnable queue
2629         - all the threads on the blocked queue
2630         - all the threads on the sleeping queue
2631         - all the thread currently executing a _ccall_GC
2632         - all the "main threads"
2633      
2634    ------------------------------------------------------------------------ */
2635
2636 /* This has to be protected either by the scheduler monitor, or by the
2637         garbage collection monitor (probably the latter).
2638         KH @ 25/10/99
2639 */
2640
2641 void
2642 GetRoots( evac_fn evac )
2643 {
2644     nat i;
2645     Capability *cap;
2646     Task *task;
2647
2648 #if defined(GRAN)
2649     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2650         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2651             evac((StgClosure **)&run_queue_hds[i]);
2652         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2653             evac((StgClosure **)&run_queue_tls[i]);
2654         
2655         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2656             evac((StgClosure **)&blocked_queue_hds[i]);
2657         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2658             evac((StgClosure **)&blocked_queue_tls[i]);
2659         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2660             evac((StgClosure **)&ccalling_threads[i]);
2661     }
2662
2663     markEventQueue();
2664
2665 #else /* !GRAN */
2666
2667     for (i = 0; i < n_capabilities; i++) {
2668         cap = &capabilities[i];
2669         evac((StgClosure **)(void *)&cap->run_queue_hd);
2670         evac((StgClosure **)(void *)&cap->run_queue_tl);
2671 #if defined(THREADED_RTS)
2672         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2673         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2674 #endif
2675         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2676              task=task->next) {
2677             debugTrace(DEBUG_sched,
2678                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2679             evac((StgClosure **)(void *)&task->suspended_tso);
2680         }
2681
2682     }
2683     
2684
2685 #if !defined(THREADED_RTS)
2686     evac((StgClosure **)(void *)&blocked_queue_hd);
2687     evac((StgClosure **)(void *)&blocked_queue_tl);
2688     evac((StgClosure **)(void *)&sleeping_queue);
2689 #endif 
2690 #endif
2691
2692     // evac((StgClosure **)&blackhole_queue);
2693
2694 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2695     markSparkQueue(evac);
2696 #endif
2697     
2698 #if defined(RTS_USER_SIGNALS)
2699     // mark the signal handlers (signals should be already blocked)
2700     if (RtsFlags.MiscFlags.install_signal_handlers) {
2701         markSignalHandlers(evac);
2702     }
2703 #endif
2704 }
2705
2706 /* -----------------------------------------------------------------------------
2707    performGC
2708
2709    This is the interface to the garbage collector from Haskell land.
2710    We provide this so that external C code can allocate and garbage
2711    collect when called from Haskell via _ccall_GC.
2712    -------------------------------------------------------------------------- */
2713
2714 static void
2715 performGC_(rtsBool force_major)
2716 {
2717     Task *task;
2718     // We must grab a new Task here, because the existing Task may be
2719     // associated with a particular Capability, and chained onto the 
2720     // suspended_ccalling_tasks queue.
2721     ACQUIRE_LOCK(&sched_mutex);
2722     task = newBoundTask();
2723     RELEASE_LOCK(&sched_mutex);
2724     scheduleDoGC(NULL,task,force_major);
2725     boundTaskExiting(task);
2726 }
2727
2728 void
2729 performGC(void)
2730 {
2731     performGC_(rtsFalse);
2732 }
2733
2734 void
2735 performMajorGC(void)
2736 {
2737     performGC_(rtsTrue);
2738 }
2739
2740 /* -----------------------------------------------------------------------------
2741    Stack overflow
2742
2743    If the thread has reached its maximum stack size, then raise the
2744    StackOverflow exception in the offending thread.  Otherwise
2745    relocate the TSO into a larger chunk of memory and adjust its stack
2746    size appropriately.
2747    -------------------------------------------------------------------------- */
2748
2749 static StgTSO *
2750 threadStackOverflow(Capability *cap, StgTSO *tso)
2751 {
2752   nat new_stack_size, stack_words;
2753   lnat new_tso_size;
2754   StgPtr new_sp;
2755   StgTSO *dest;
2756
2757   IF_DEBUG(sanity,checkTSO(tso));
2758
2759   // don't allow throwTo() to modify the blocked_exceptions queue
2760   // while we are moving the TSO:
2761   lockClosure((StgClosure *)tso);
2762
2763   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2764       // NB. never raise a StackOverflow exception if the thread is
2765       // inside Control.Exceptino.block.  It is impractical to protect
2766       // against stack overflow exceptions, since virtually anything
2767       // can raise one (even 'catch'), so this is the only sensible
2768       // thing to do here.  See bug #767.
2769
2770       debugTrace(DEBUG_gc,
2771                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2772                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2773       IF_DEBUG(gc,
2774                /* If we're debugging, just print out the top of the stack */
2775                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2776                                                 tso->sp+64)));
2777
2778       // Send this thread the StackOverflow exception
2779       unlockTSO(tso);
2780       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2781       return tso;
2782   }
2783
2784   /* Try to double the current stack size.  If that takes us over the
2785    * maximum stack size for this thread, then use the maximum instead.
2786    * Finally round up so the TSO ends up as a whole number of blocks.
2787    */
2788   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2789   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2790                                        TSO_STRUCT_SIZE)/sizeof(W_);
2791   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2792   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2793
2794   debugTrace(DEBUG_sched, 
2795              "increasing stack size from %ld words to %d.",
2796              (long)tso->stack_size, new_stack_size);
2797
2798   dest = (StgTSO *)allocate(new_tso_size);
2799   TICK_ALLOC_TSO(new_stack_size,0);
2800
2801   /* copy the TSO block and the old stack into the new area */
2802   memcpy(dest,tso,TSO_STRUCT_SIZE);
2803   stack_words = tso->stack + tso->stack_size - tso->sp;
2804   new_sp = (P_)dest + new_tso_size - stack_words;
2805   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2806
2807   /* relocate the stack pointers... */
2808   dest->sp         = new_sp;
2809   dest->stack_size = new_stack_size;
2810         
2811   /* Mark the old TSO as relocated.  We have to check for relocated
2812    * TSOs in the garbage collector and any primops that deal with TSOs.
2813    *
2814    * It's important to set the sp value to just beyond the end
2815    * of the stack, so we don't attempt to scavenge any part of the
2816    * dead TSO's stack.
2817    */
2818   tso->what_next = ThreadRelocated;
2819   tso->link = dest;
2820   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2821   tso->why_blocked = NotBlocked;
2822
2823   IF_PAR_DEBUG(verbose,
2824                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2825                      tso->id, tso, tso->stack_size);
2826                /* If we're debugging, just print out the top of the stack */
2827                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2828                                                 tso->sp+64)));
2829   
2830   unlockTSO(dest);
2831   unlockTSO(tso);
2832
2833   IF_DEBUG(sanity,checkTSO(dest));
2834 #if 0
2835   IF_DEBUG(scheduler,printTSO(dest));
2836 #endif
2837
2838   return dest;
2839 }
2840
2841 /* ---------------------------------------------------------------------------
2842    Interrupt execution
2843    - usually called inside a signal handler so it mustn't do anything fancy.   
2844    ------------------------------------------------------------------------ */
2845
2846 void
2847 interruptStgRts(void)
2848 {
2849     sched_state = SCHED_INTERRUPTING;
2850     context_switch = 1;
2851     wakeUpRts();
2852 }
2853
2854 /* -----------------------------------------------------------------------------
2855    Wake up the RTS
2856    
2857    This function causes at least one OS thread to wake up and run the
2858    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2859    an external event has arrived that may need servicing (eg. a
2860    keyboard interrupt).
2861
2862    In the single-threaded RTS we don't do anything here; we only have
2863    one thread anyway, and the event that caused us to want to wake up
2864    will have interrupted any blocking system call in progress anyway.
2865    -------------------------------------------------------------------------- */
2866
2867 void
2868 wakeUpRts(void)
2869 {
2870 #if defined(THREADED_RTS)
2871     // This forces the IO Manager thread to wakeup, which will
2872     // in turn ensure that some OS thread wakes up and runs the
2873     // scheduler loop, which will cause a GC and deadlock check.
2874     ioManagerWakeup();
2875 #endif
2876 }
2877
2878 /* -----------------------------------------------------------------------------
2879  * checkBlackHoles()
2880  *
2881  * Check the blackhole_queue for threads that can be woken up.  We do
2882  * this periodically: before every GC, and whenever the run queue is
2883  * empty.
2884  *
2885  * An elegant solution might be to just wake up all the blocked
2886  * threads with awakenBlockedQueue occasionally: they'll go back to
2887  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2888  * doesn't give us a way to tell whether we've actually managed to
2889  * wake up any threads, so we would be busy-waiting.
2890  *
2891  * -------------------------------------------------------------------------- */
2892
2893 static rtsBool
2894 checkBlackHoles (Capability *cap)
2895 {
2896     StgTSO **prev, *t;
2897     rtsBool any_woke_up = rtsFalse;
2898     StgHalfWord type;
2899
2900     // blackhole_queue is global:
2901     ASSERT_LOCK_HELD(&sched_mutex);
2902
2903     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2904
2905     // ASSUMES: sched_mutex
2906     prev = &blackhole_queue;
2907     t = blackhole_queue;
2908     while (t != END_TSO_QUEUE) {
2909         ASSERT(t->why_blocked == BlockedOnBlackHole);
2910         type = get_itbl(t->block_info.closure)->type;
2911         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2912             IF_DEBUG(sanity,checkTSO(t));
2913             t = unblockOne(cap, t);
2914             // urk, the threads migrate to the current capability
2915             // here, but we'd like to keep them on the original one.
2916             *prev = t;
2917             any_woke_up = rtsTrue;
2918         } else {
2919             prev = &t->link;
2920             t = t->link;
2921         }
2922     }
2923
2924     return any_woke_up;
2925 }
2926
2927 /* -----------------------------------------------------------------------------
2928    Deleting threads
2929
2930    This is used for interruption (^C) and forking, and corresponds to
2931    raising an exception but without letting the thread catch the
2932    exception.
2933    -------------------------------------------------------------------------- */
2934
2935 static void 
2936 deleteThread (Capability *cap, StgTSO *tso)
2937 {
2938     // NOTE: must only be called on a TSO that we have exclusive
2939     // access to, because we will call throwToSingleThreaded() below.
2940     // The TSO must be on the run queue of the Capability we own, or 
2941     // we must own all Capabilities.
2942
2943     if (tso->why_blocked != BlockedOnCCall &&
2944         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2945         throwToSingleThreaded(cap,tso,NULL);
2946     }
2947 }
2948
2949 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2950 static void 
2951 deleteThread_(Capability *cap, StgTSO *tso)
2952 { // for forkProcess only:
2953   // like deleteThread(), but we delete threads in foreign calls, too.
2954
2955     if (tso->why_blocked == BlockedOnCCall ||
2956         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2957         unblockOne(cap,tso);
2958         tso->what_next = ThreadKilled;
2959     } else {
2960         deleteThread(cap,tso);
2961     }
2962 }
2963 #endif
2964
2965 /* -----------------------------------------------------------------------------
2966    raiseExceptionHelper
2967    
2968    This function is called by the raise# primitve, just so that we can
2969    move some of the tricky bits of raising an exception from C-- into
2970    C.  Who knows, it might be a useful re-useable thing here too.
2971    -------------------------------------------------------------------------- */
2972
2973 StgWord
2974 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2975 {
2976     Capability *cap = regTableToCapability(reg);
2977     StgThunk *raise_closure = NULL;
2978     StgPtr p, next;
2979     StgRetInfoTable *info;
2980     //
2981     // This closure represents the expression 'raise# E' where E
2982     // is the exception raise.  It is used to overwrite all the
2983     // thunks which are currently under evaluataion.
2984     //
2985
2986     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2987     // LDV profiling: stg_raise_info has THUNK as its closure
2988     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2989     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2990     // 1 does not cause any problem unless profiling is performed.
2991     // However, when LDV profiling goes on, we need to linearly scan
2992     // small object pool, where raise_closure is stored, so we should
2993     // use MIN_UPD_SIZE.
2994     //
2995     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2996     //                                 sizeofW(StgClosure)+1);
2997     //
2998
2999     //
3000     // Walk up the stack, looking for the catch frame.  On the way,
3001     // we update any closures pointed to from update frames with the
3002     // raise closure that we just built.
3003     //
3004     p = tso->sp;
3005     while(1) {
3006         info = get_ret_itbl((StgClosure *)p);
3007         next = p + stack_frame_sizeW((StgClosure *)p);
3008         switch (info->i.type) {
3009             
3010         case UPDATE_FRAME:
3011             // Only create raise_closure if we need to.
3012             if (raise_closure == NULL) {
3013                 raise_closure = 
3014                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3015                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3016                 raise_closure->payload[0] = exception;
3017             }
3018             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3019             p = next;
3020             continue;
3021
3022         case ATOMICALLY_FRAME:
3023             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3024             tso->sp = p;
3025             return ATOMICALLY_FRAME;
3026             
3027         case CATCH_FRAME:
3028             tso->sp = p;
3029             return CATCH_FRAME;
3030
3031         case CATCH_STM_FRAME:
3032             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3033             tso->sp = p;
3034             return CATCH_STM_FRAME;
3035             
3036         case STOP_FRAME:
3037             tso->sp = p;
3038             return STOP_FRAME;
3039
3040         case CATCH_RETRY_FRAME:
3041         default:
3042             p = next; 
3043             continue;
3044         }
3045     }
3046 }
3047
3048
3049 /* -----------------------------------------------------------------------------
3050    findRetryFrameHelper
3051
3052    This function is called by the retry# primitive.  It traverses the stack
3053    leaving tso->sp referring to the frame which should handle the retry.  
3054
3055    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3056    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3057
3058    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3059    create) because retries are not considered to be exceptions, despite the
3060    similar implementation.
3061
3062    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3063    not be created within memory transactions.
3064    -------------------------------------------------------------------------- */
3065
3066 StgWord
3067 findRetryFrameHelper (StgTSO *tso)
3068 {
3069   StgPtr           p, next;
3070   StgRetInfoTable *info;
3071
3072   p = tso -> sp;
3073   while (1) {
3074     info = get_ret_itbl((StgClosure *)p);
3075     next = p + stack_frame_sizeW((StgClosure *)p);
3076     switch (info->i.type) {
3077       
3078     case ATOMICALLY_FRAME:
3079         debugTrace(DEBUG_stm,
3080                    "found ATOMICALLY_FRAME at %p during retry", p);
3081         tso->sp = p;
3082         return ATOMICALLY_FRAME;
3083       
3084     case CATCH_RETRY_FRAME:
3085         debugTrace(DEBUG_stm,
3086                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3087         tso->sp = p;
3088         return CATCH_RETRY_FRAME;
3089       
3090     case CATCH_STM_FRAME: {
3091         debugTrace(DEBUG_stm,
3092                    "found CATCH_STM_FRAME at %p during retry", p);
3093         StgTRecHeader *trec = tso -> trec;
3094         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3095         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3096         stmAbortTransaction(tso -> cap, trec);
3097         stmFreeAbortedTRec(tso -> cap, trec);
3098         tso -> trec = outer;
3099         p = next; 
3100         continue;
3101     }
3102       
3103
3104     default:
3105       ASSERT(info->i.type != CATCH_FRAME);
3106       ASSERT(info->i.type != STOP_FRAME);
3107       p = next; 
3108       continue;
3109     }
3110   }
3111 }
3112
3113 /* -----------------------------------------------------------------------------
3114    resurrectThreads is called after garbage collection on the list of
3115    threads found to be garbage.  Each of these threads will be woken
3116    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3117    on an MVar, or NonTermination if the thread was blocked on a Black
3118    Hole.
3119
3120    Locks: assumes we hold *all* the capabilities.
3121    -------------------------------------------------------------------------- */
3122
3123 void
3124 resurrectThreads (StgTSO *threads)
3125 {
3126     StgTSO *tso, *next;
3127     Capability *cap;
3128
3129     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3130         next = tso->global_link;
3131         tso->global_link = all_threads;
3132         all_threads = tso;
3133         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3134         
3135         // Wake up the thread on the Capability it was last on
3136         cap = tso->cap;
3137         
3138         switch (tso->why_blocked) {
3139         case BlockedOnMVar:
3140         case BlockedOnException:
3141             /* Called by GC - sched_mutex lock is currently held. */
3142             throwToSingleThreaded(cap, tso,
3143                                   (StgClosure *)BlockedOnDeadMVar_closure);
3144             break;
3145         case BlockedOnBlackHole:
3146             throwToSingleThreaded(cap, tso,
3147                                   (StgClosure *)NonTermination_closure);
3148             break;
3149         case BlockedOnSTM:
3150             throwToSingleThreaded(cap, tso,
3151                                   (StgClosure *)BlockedIndefinitely_closure);
3152             break;
3153         case NotBlocked:
3154             /* This might happen if the thread was blocked on a black hole
3155              * belonging to a thread that we've just woken up (raiseAsync
3156              * can wake up threads, remember...).
3157              */
3158             continue;
3159         default:
3160             barf("resurrectThreads: thread blocked in a strange way");
3161         }
3162     }
3163 }