Don't traverse the entire list of threads on every GC (phase 1)
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* flag set by signal handler to precipitate a context switch
122  * LOCK: none (just an advisory flag)
123  */
124 int context_switch = 0;
125
126 /* flag that tracks whether we have done any execution in this time slice.
127  * LOCK: currently none, perhaps we should lock (but needs to be
128  * updated in the fast path of the scheduler).
129  */
130 nat recent_activity = ACTIVITY_YES;
131
132 /* if this flag is set as well, give up execution
133  * LOCK: none (changes once, from false->true)
134  */
135 rtsBool sched_state = SCHED_RUNNING;
136
137 #if defined(GRAN)
138 StgTSO *CurrentTSO;
139 #endif
140
141 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
142  *  exists - earlier gccs apparently didn't.
143  *  -= chak
144  */
145 StgTSO dummy_tso;
146
147 /*
148  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149  * in an MT setting, needed to signal that a worker thread shouldn't hang around
150  * in the scheduler when it is out of work.
151  */
152 rtsBool shutting_down_scheduler = rtsFalse;
153
154 /*
155  * This mutex protects most of the global scheduler data in
156  * the THREADED_RTS runtime.
157  */
158 #if defined(THREADED_RTS)
159 Mutex sched_mutex;
160 #endif
161
162 #if defined(PARALLEL_HASKELL)
163 StgTSO *LastTSO;
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
166 #endif
167
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
170 #endif
171
172 /* -----------------------------------------------------------------------------
173  * static function prototypes
174  * -------------------------------------------------------------------------- */
175
176 static Capability *schedule (Capability *initialCapability, Task *task);
177
178 //
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
182 //
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
186 #endif
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
192 #if defined(GRAN)
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
194 #endif
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
199 #endif
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
202 #endif
203 static void schedulePostRunThread(void);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
206                                          StgTSO *t);
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
208                                     nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
211                                              StgTSO *t );
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214                                 rtsBool force_major);
215
216 static rtsBool checkBlackHoles(Capability *cap);
217
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
223
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
226 #endif
227
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);  
231 #endif
232
233 #ifdef DEBUG
234 static char *whatNext_strs[] = {
235   "(unknown)",
236   "ThreadRunGHC",
237   "ThreadInterpret",
238   "ThreadKilled",
239   "ThreadRelocated",
240   "ThreadComplete"
241 };
242 #endif
243
244 /* -----------------------------------------------------------------------------
245  * Putting a thread on the run queue: different scheduling policies
246  * -------------------------------------------------------------------------- */
247
248 STATIC_INLINE void
249 addToRunQueue( Capability *cap, StgTSO *t )
250 {
251 #if defined(PARALLEL_HASKELL)
252     if (RtsFlags.ParFlags.doFairScheduling) { 
253         // this does round-robin scheduling; good for concurrency
254         appendToRunQueue(cap,t);
255     } else {
256         // this does unfair scheduling; good for parallelism
257         pushOnRunQueue(cap,t);
258     }
259 #else
260     // this does round-robin scheduling; good for concurrency
261     appendToRunQueue(cap,t);
262 #endif
263 }
264
265 /* ---------------------------------------------------------------------------
266    Main scheduling loop.
267
268    We use round-robin scheduling, each thread returning to the
269    scheduler loop when one of these conditions is detected:
270
271       * out of heap space
272       * timer expires (thread yields)
273       * thread blocks
274       * thread ends
275       * stack overflow
276
277    GRAN version:
278      In a GranSim setup this loop iterates over the global event queue.
279      This revolves around the global event queue, which determines what 
280      to do next. Therefore, it's more complicated than either the 
281      concurrent or the parallel (GUM) setup.
282
283    GUM version:
284      GUM iterates over incoming messages.
285      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286      and sends out a fish whenever it has nothing to do; in-between
287      doing the actual reductions (shared code below) it processes the
288      incoming messages and deals with delayed operations 
289      (see PendingFetches).
290      This is not the ugliest code you could imagine, but it's bloody close.
291
292    ------------------------------------------------------------------------ */
293
294 static Capability *
295 schedule (Capability *initialCapability, Task *task)
296 {
297   StgTSO *t;
298   Capability *cap;
299   StgThreadReturnCode ret;
300 #if defined(GRAN)
301   rtsEvent *event;
302 #elif defined(PARALLEL_HASKELL)
303   StgTSO *tso;
304   GlobalTaskId pe;
305   rtsBool receivedFinish = rtsFalse;
306 # if defined(DEBUG)
307   nat tp_size, sp_size; // stats only
308 # endif
309 #endif
310   nat prev_what_next;
311   rtsBool ready_to_gc;
312 #if defined(THREADED_RTS)
313   rtsBool first = rtsTrue;
314 #endif
315   
316   cap = initialCapability;
317
318   // Pre-condition: this task owns initialCapability.
319   // The sched_mutex is *NOT* held
320   // NB. on return, we still hold a capability.
321
322   debugTrace (DEBUG_sched, 
323               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324               task, initialCapability);
325
326   schedulePreLoop();
327
328   // -----------------------------------------------------------
329   // Scheduler loop starts here:
330
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION        (!receivedFinish)
333 #elif defined(GRAN)
334 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
335 #else
336 #define TERMINATION_CONDITION        rtsTrue
337 #endif
338
339   while (TERMINATION_CONDITION) {
340
341 #if defined(GRAN)
342       /* Choose the processor with the next event */
343       CurrentProc = event->proc;
344       CurrentTSO = event->tso;
345 #endif
346
347 #if defined(THREADED_RTS)
348       if (first) {
349           // don't yield the first time, we want a chance to run this
350           // thread for a bit, even if there are others banging at the
351           // door.
352           first = rtsFalse;
353           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354       } else {
355           // Yield the capability to higher-priority tasks if necessary.
356           yieldCapability(&cap, task);
357       }
358 #endif
359       
360 #if defined(THREADED_RTS)
361       schedulePushWork(cap,task);
362 #endif
363
364     // Check whether we have re-entered the RTS from Haskell without
365     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
366     // call).
367     if (cap->in_haskell) {
368           errorBelch("schedule: re-entered unsafely.\n"
369                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
370           stg_exit(EXIT_FAILURE);
371     }
372
373     // The interruption / shutdown sequence.
374     // 
375     // In order to cleanly shut down the runtime, we want to:
376     //   * make sure that all main threads return to their callers
377     //     with the state 'Interrupted'.
378     //   * clean up all OS threads assocated with the runtime
379     //   * free all memory etc.
380     //
381     // So the sequence for ^C goes like this:
382     //
383     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
384     //     arranges for some Capability to wake up
385     //
386     //   * all threads in the system are halted, and the zombies are
387     //     placed on the run queue for cleaning up.  We acquire all
388     //     the capabilities in order to delete the threads, this is
389     //     done by scheduleDoGC() for convenience (because GC already
390     //     needs to acquire all the capabilities).  We can't kill
391     //     threads involved in foreign calls.
392     // 
393     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
394     //
395     //   * sched_state := SCHED_SHUTTING_DOWN
396     //
397     //   * all workers exit when the run queue on their capability
398     //     drains.  All main threads will also exit when their TSO
399     //     reaches the head of the run queue and they can return.
400     //
401     //   * eventually all Capabilities will shut down, and the RTS can
402     //     exit.
403     //
404     //   * We might be left with threads blocked in foreign calls, 
405     //     we should really attempt to kill these somehow (TODO);
406     
407     switch (sched_state) {
408     case SCHED_RUNNING:
409         break;
410     case SCHED_INTERRUPTING:
411         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413         discardSparksCap(cap);
414 #endif
415         /* scheduleDoGC() deletes all the threads */
416         cap = scheduleDoGC(cap,task,rtsFalse);
417         break;
418     case SCHED_SHUTTING_DOWN:
419         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420         // If we are a worker, just exit.  If we're a bound thread
421         // then we will exit below when we've removed our TSO from
422         // the run queue.
423         if (task->tso == NULL && emptyRunQueue(cap)) {
424             return cap;
425         }
426         break;
427     default:
428         barf("sched_state: %d", sched_state);
429     }
430
431 #if defined(THREADED_RTS)
432     // If the run queue is empty, take a spark and turn it into a thread.
433     {
434         if (emptyRunQueue(cap)) {
435             StgClosure *spark;
436             spark = findSpark(cap);
437             if (spark != NULL) {
438                 debugTrace(DEBUG_sched,
439                            "turning spark of closure %p into a thread",
440                            (StgClosure *)spark);
441                 createSparkThread(cap,spark);     
442             }
443         }
444     }
445 #endif // THREADED_RTS
446
447     scheduleStartSignalHandlers(cap);
448
449     // Only check the black holes here if we've nothing else to do.
450     // During normal execution, the black hole list only gets checked
451     // at GC time, to avoid repeatedly traversing this possibly long
452     // list each time around the scheduler.
453     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454
455     scheduleCheckWakeupThreads(cap);
456
457     scheduleCheckBlockedThreads(cap);
458
459     scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461     cap = task->cap;    // reload cap, it might have changed
462 #endif
463
464     // Normally, the only way we can get here with no threads to
465     // run is if a keyboard interrupt received during 
466     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467     // Additionally, it is not fatal for the
468     // threaded RTS to reach here with no threads to run.
469     //
470     // win32: might be here due to awaitEvent() being abandoned
471     // as a result of a console event having been delivered.
472     if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474         ASSERT(sched_state >= SCHED_INTERRUPTING);
475 #endif
476         continue; // nothing to do
477     }
478
479 #if defined(PARALLEL_HASKELL)
480     scheduleSendPendingMessages();
481     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
482         continue;
483
484 #if defined(SPARKS)
485     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
486 #endif
487
488     /* If we still have no work we need to send a FISH to get a spark
489        from another PE */
490     if (emptyRunQueue(cap)) {
491         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492         ASSERT(rtsFalse); // should not happen at the moment
493     }
494     // from here: non-empty run queue.
495     //  TODO: merge above case with this, only one call processMessages() !
496     if (PacketsWaiting()) {  /* process incoming messages, if
497                                 any pending...  only in else
498                                 because getRemoteWork waits for
499                                 messages as well */
500         receivedFinish = processMessages();
501     }
502 #endif
503
504 #if defined(GRAN)
505     scheduleProcessEvent(event);
506 #endif
507
508     // 
509     // Get a thread to run
510     //
511     t = popRunQueue(cap);
512
513 #if defined(GRAN) || defined(PAR)
514     scheduleGranParReport(); // some kind of debuging output
515 #else
516     // Sanity check the thread we're about to run.  This can be
517     // expensive if there is lots of thread switching going on...
518     IF_DEBUG(sanity,checkTSO(t));
519 #endif
520
521 #if defined(THREADED_RTS)
522     // Check whether we can run this thread in the current task.
523     // If not, we have to pass our capability to the right task.
524     {
525         Task *bound = t->bound;
526       
527         if (bound) {
528             if (bound == task) {
529                 debugTrace(DEBUG_sched,
530                            "### Running thread %lu in bound thread", (unsigned long)t->id);
531                 // yes, the Haskell thread is bound to the current native thread
532             } else {
533                 debugTrace(DEBUG_sched,
534                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 debugTrace(DEBUG_sched,
543                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]);
568
569     startHeapProfTimer();
570
571     // Check for exceptions blocked on this thread
572     maybePerformBlockedException (cap, t);
573
574     // ----------------------------------------------------------------------
575     // Run the current thread 
576
577     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
578     ASSERT(t->cap == cap);
579
580     prev_what_next = t->what_next;
581
582     errno = t->saved_errno;
583 #if mingw32_HOST_OS
584     SetLastError(t->saved_winerror);
585 #endif
586
587     cap->in_haskell = rtsTrue;
588
589     dirty_TSO(cap,t);
590
591 #if defined(THREADED_RTS)
592     if (recent_activity == ACTIVITY_DONE_GC) {
593         // ACTIVITY_DONE_GC means we turned off the timer signal to
594         // conserve power (see #1623).  Re-enable it here.
595         nat prev;
596         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
597         if (prev == ACTIVITY_DONE_GC) {
598             startTimer();
599         }
600     } else {
601         recent_activity = ACTIVITY_YES;
602     }
603 #endif
604
605     switch (prev_what_next) {
606         
607     case ThreadKilled:
608     case ThreadComplete:
609         /* Thread already finished, return to scheduler. */
610         ret = ThreadFinished;
611         break;
612         
613     case ThreadRunGHC:
614     {
615         StgRegTable *r;
616         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
617         cap = regTableToCapability(r);
618         ret = r->rRet;
619         break;
620     }
621     
622     case ThreadInterpret:
623         cap = interpretBCO(cap);
624         ret = cap->r.rRet;
625         break;
626         
627     default:
628         barf("schedule: invalid what_next field");
629     }
630
631     cap->in_haskell = rtsFalse;
632
633     // The TSO might have moved, eg. if it re-entered the RTS and a GC
634     // happened.  So find the new location:
635     t = cap->r.rCurrentTSO;
636
637     // We have run some Haskell code: there might be blackhole-blocked
638     // threads to wake up now.
639     // Lock-free test here should be ok, we're just setting a flag.
640     if ( blackhole_queue != END_TSO_QUEUE ) {
641         blackholes_need_checking = rtsTrue;
642     }
643     
644     // And save the current errno in this thread.
645     // XXX: possibly bogus for SMP because this thread might already
646     // be running again, see code below.
647     t->saved_errno = errno;
648 #if mingw32_HOST_OS
649     // Similarly for Windows error code
650     t->saved_winerror = GetLastError();
651 #endif
652
653 #if defined(THREADED_RTS)
654     // If ret is ThreadBlocked, and this Task is bound to the TSO that
655     // blocked, we are in limbo - the TSO is now owned by whatever it
656     // is blocked on, and may in fact already have been woken up,
657     // perhaps even on a different Capability.  It may be the case
658     // that task->cap != cap.  We better yield this Capability
659     // immediately and return to normaility.
660     if (ret == ThreadBlocked) {
661         debugTrace(DEBUG_sched,
662                    "--<< thread %lu (%s) stopped: blocked",
663                    (unsigned long)t->id, whatNext_strs[t->what_next]);
664         continue;
665     }
666 #endif
667
668     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
669     ASSERT(t->cap == cap);
670
671     // ----------------------------------------------------------------------
672     
673     // Costs for the scheduler are assigned to CCS_SYSTEM
674     stopHeapProfTimer();
675 #if defined(PROFILING)
676     CCCS = CCS_SYSTEM;
677 #endif
678     
679     schedulePostRunThread();
680
681     t = threadStackUnderflow(task,t);
682
683     ready_to_gc = rtsFalse;
684
685     switch (ret) {
686     case HeapOverflow:
687         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
688         break;
689
690     case StackOverflow:
691         scheduleHandleStackOverflow(cap,task,t);
692         break;
693
694     case ThreadYielding:
695         if (scheduleHandleYield(cap, t, prev_what_next)) {
696             // shortcut for switching between compiler/interpreter:
697             goto run_thread; 
698         }
699         break;
700
701     case ThreadBlocked:
702         scheduleHandleThreadBlocked(t);
703         break;
704
705     case ThreadFinished:
706         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
707         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
708         break;
709
710     default:
711       barf("schedule: invalid thread return code %d", (int)ret);
712     }
713
714     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
715       cap = scheduleDoGC(cap,task,rtsFalse);
716     }
717   } /* end of while() */
718 }
719
720 /* ----------------------------------------------------------------------------
721  * Setting up the scheduler loop
722  * ------------------------------------------------------------------------- */
723
724 static void
725 schedulePreLoop(void)
726 {
727 #if defined(GRAN) 
728     /* set up first event to get things going */
729     /* ToDo: assign costs for system setup and init MainTSO ! */
730     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
731               ContinueThread, 
732               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
733     
734     debugTrace (DEBUG_gran,
735                 "GRAN: Init CurrentTSO (in schedule) = %p", 
736                 CurrentTSO);
737     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
738     
739     if (RtsFlags.GranFlags.Light) {
740         /* Save current time; GranSim Light only */
741         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
742     }      
743 #endif
744 }
745
746 /* -----------------------------------------------------------------------------
747  * schedulePushWork()
748  *
749  * Push work to other Capabilities if we have some.
750  * -------------------------------------------------------------------------- */
751
752 #if defined(THREADED_RTS)
753 static void
754 schedulePushWork(Capability *cap USED_IF_THREADS, 
755                  Task *task      USED_IF_THREADS)
756 {
757     Capability *free_caps[n_capabilities], *cap0;
758     nat i, n_free_caps;
759
760     // migration can be turned off with +RTS -qg
761     if (!RtsFlags.ParFlags.migrate) return;
762
763     // Check whether we have more threads on our run queue, or sparks
764     // in our pool, that we could hand to another Capability.
765     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
766         && sparkPoolSizeCap(cap) < 2) {
767         return;
768     }
769
770     // First grab as many free Capabilities as we can.
771     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
772         cap0 = &capabilities[i];
773         if (cap != cap0 && tryGrabCapability(cap0,task)) {
774             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
775                 // it already has some work, we just grabbed it at 
776                 // the wrong moment.  Or maybe it's deadlocked!
777                 releaseCapability(cap0);
778             } else {
779                 free_caps[n_free_caps++] = cap0;
780             }
781         }
782     }
783
784     // we now have n_free_caps free capabilities stashed in
785     // free_caps[].  Share our run queue equally with them.  This is
786     // probably the simplest thing we could do; improvements we might
787     // want to do include:
788     //
789     //   - giving high priority to moving relatively new threads, on 
790     //     the gournds that they haven't had time to build up a
791     //     working set in the cache on this CPU/Capability.
792     //
793     //   - giving low priority to moving long-lived threads
794
795     if (n_free_caps > 0) {
796         StgTSO *prev, *t, *next;
797         rtsBool pushed_to_all;
798
799         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
800
801         i = 0;
802         pushed_to_all = rtsFalse;
803
804         if (cap->run_queue_hd != END_TSO_QUEUE) {
805             prev = cap->run_queue_hd;
806             t = prev->_link;
807             prev->_link = END_TSO_QUEUE;
808             for (; t != END_TSO_QUEUE; t = next) {
809                 next = t->_link;
810                 t->_link = END_TSO_QUEUE;
811                 if (t->what_next == ThreadRelocated
812                     || t->bound == task // don't move my bound thread
813                     || tsoLocked(t)) {  // don't move a locked thread
814                     setTSOLink(cap, prev, t);
815                     prev = t;
816                 } else if (i == n_free_caps) {
817                     pushed_to_all = rtsTrue;
818                     i = 0;
819                     // keep one for us
820                     setTSOLink(cap, prev, t);
821                     prev = t;
822                 } else {
823                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
824                     appendToRunQueue(free_caps[i],t);
825                     if (t->bound) { t->bound->cap = free_caps[i]; }
826                     t->cap = free_caps[i];
827                     i++;
828                 }
829             }
830             cap->run_queue_tl = prev;
831         }
832
833         // If there are some free capabilities that we didn't push any
834         // threads to, then try to push a spark to each one.
835         if (!pushed_to_all) {
836             StgClosure *spark;
837             // i is the next free capability to push to
838             for (; i < n_free_caps; i++) {
839                 if (emptySparkPoolCap(free_caps[i])) {
840                     spark = findSpark(cap);
841                     if (spark != NULL) {
842                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843                         newSpark(&(free_caps[i]->r), spark);
844                     }
845                 }
846             }
847         }
848
849         // release the capabilities
850         for (i = 0; i < n_free_caps; i++) {
851             task->cap = free_caps[i];
852             releaseCapability(free_caps[i]);
853         }
854     }
855     task->cap = cap; // reset to point to our Capability.
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860  * Start any pending signal handlers
861  * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868         // safe outside the lock
869         startSignalHandlers(cap);
870     }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880  * Check for blocked threads that can be woken up.
881  * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887     //
888     // Check whether any waiting threads need to be woken up.  If the
889     // run queue is empty, and there are no other tasks running, we
890     // can wait indefinitely for something to happen.
891     //
892     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893     {
894         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
895     }
896 #endif
897 }
898
899
900 /* ----------------------------------------------------------------------------
901  * Check for threads woken up by other Capabilities
902  * ------------------------------------------------------------------------- */
903
904 static void
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
906 {
907 #if defined(THREADED_RTS)
908     // Any threads that were woken up by other Capabilities get
909     // appended to our run queue.
910     if (!emptyWakeupQueue(cap)) {
911         ACQUIRE_LOCK(&cap->lock);
912         if (emptyRunQueue(cap)) {
913             cap->run_queue_hd = cap->wakeup_queue_hd;
914             cap->run_queue_tl = cap->wakeup_queue_tl;
915         } else {
916             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         }
919         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920         RELEASE_LOCK(&cap->lock);
921     }
922 #endif
923 }
924
925 /* ----------------------------------------------------------------------------
926  * Check for threads blocked on BLACKHOLEs that can be woken up
927  * ------------------------------------------------------------------------- */
928 static void
929 scheduleCheckBlackHoles (Capability *cap)
930 {
931     if ( blackholes_need_checking ) // check without the lock first
932     {
933         ACQUIRE_LOCK(&sched_mutex);
934         if ( blackholes_need_checking ) {
935             checkBlackHoles(cap);
936             blackholes_need_checking = rtsFalse;
937         }
938         RELEASE_LOCK(&sched_mutex);
939     }
940 }
941
942 /* ----------------------------------------------------------------------------
943  * Detect deadlock conditions and attempt to resolve them.
944  * ------------------------------------------------------------------------- */
945
946 static void
947 scheduleDetectDeadlock (Capability *cap, Task *task)
948 {
949
950 #if defined(PARALLEL_HASKELL)
951     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
952     return;
953 #endif
954
955     /* 
956      * Detect deadlock: when we have no threads to run, there are no
957      * threads blocked, waiting for I/O, or sleeping, and all the
958      * other tasks are waiting for work, we must have a deadlock of
959      * some description.
960      */
961     if ( emptyThreadQueues(cap) )
962     {
963 #if defined(THREADED_RTS)
964         /* 
965          * In the threaded RTS, we only check for deadlock if there
966          * has been no activity in a complete timeslice.  This means
967          * we won't eagerly start a full GC just because we don't have
968          * any threads to run currently.
969          */
970         if (recent_activity != ACTIVITY_INACTIVE) return;
971 #endif
972
973         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
974
975         // Garbage collection can release some new threads due to
976         // either (a) finalizers or (b) threads resurrected because
977         // they are unreachable and will therefore be sent an
978         // exception.  Any threads thus released will be immediately
979         // runnable.
980         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
981
982         recent_activity = ACTIVITY_DONE_GC;
983         // disable timer signals (see #1623)
984         stopTimer();
985         
986         if ( !emptyRunQueue(cap) ) return;
987
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989         /* If we have user-installed signal handlers, then wait
990          * for signals to arrive rather then bombing out with a
991          * deadlock.
992          */
993         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994             debugTrace(DEBUG_sched,
995                        "still deadlocked, waiting for signals...");
996
997             awaitUserSignals();
998
999             if (signals_pending()) {
1000                 startSignalHandlers(cap);
1001             }
1002
1003             // either we have threads to run, or we were interrupted:
1004             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1005         }
1006 #endif
1007
1008 #if !defined(THREADED_RTS)
1009         /* Probably a real deadlock.  Send the current main thread the
1010          * Deadlock exception.
1011          */
1012         if (task->tso) {
1013             switch (task->tso->why_blocked) {
1014             case BlockedOnSTM:
1015             case BlockedOnBlackHole:
1016             case BlockedOnException:
1017             case BlockedOnMVar:
1018                 throwToSingleThreaded(cap, task->tso, 
1019                                       (StgClosure *)NonTermination_closure);
1020                 return;
1021             default:
1022                 barf("deadlock: main thread blocked in a strange way");
1023             }
1024         }
1025         return;
1026 #endif
1027     }
1028 }
1029
1030 /* ----------------------------------------------------------------------------
1031  * Process an event (GRAN only)
1032  * ------------------------------------------------------------------------- */
1033
1034 #if defined(GRAN)
1035 static StgTSO *
1036 scheduleProcessEvent(rtsEvent *event)
1037 {
1038     StgTSO *t;
1039
1040     if (RtsFlags.GranFlags.Light)
1041       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1042
1043     /* adjust time based on time-stamp */
1044     if (event->time > CurrentTime[CurrentProc] &&
1045         event->evttype != ContinueThread)
1046       CurrentTime[CurrentProc] = event->time;
1047     
1048     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1049     if (!RtsFlags.GranFlags.Light)
1050       handleIdlePEs();
1051
1052     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1053
1054     /* main event dispatcher in GranSim */
1055     switch (event->evttype) {
1056       /* Should just be continuing execution */
1057     case ContinueThread:
1058       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1059       /* ToDo: check assertion
1060       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1061              run_queue_hd != END_TSO_QUEUE);
1062       */
1063       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1064       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1065           procStatus[CurrentProc]==Fetching) {
1066         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1067               CurrentTSO->id, CurrentTSO, CurrentProc);
1068         goto next_thread;
1069       } 
1070       /* Ignore ContinueThreads for completed threads */
1071       if (CurrentTSO->what_next == ThreadComplete) {
1072         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1073               CurrentTSO->id, CurrentTSO, CurrentProc);
1074         goto next_thread;
1075       } 
1076       /* Ignore ContinueThreads for threads that are being migrated */
1077       if (PROCS(CurrentTSO)==Nowhere) { 
1078         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079               CurrentTSO->id, CurrentTSO, CurrentProc);
1080         goto next_thread;
1081       }
1082       /* The thread should be at the beginning of the run queue */
1083       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1084         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1085               CurrentTSO->id, CurrentTSO, CurrentProc);
1086         break; // run the thread anyway
1087       }
1088       /*
1089       new_event(proc, proc, CurrentTime[proc],
1090                 FindWork,
1091                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1092       goto next_thread; 
1093       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1094       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1095
1096     case FetchNode:
1097       do_the_fetchnode(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case GlobalBlock:
1101       do_the_globalblock(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case FetchReply:
1105       do_the_fetchreply(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case UnblockThread:   /* Move from the blocked queue to the tail of */
1109       do_the_unblock(event);
1110       goto next_thread;             /* handle next event in event queue  */
1111       
1112     case ResumeThread:  /* Move from the blocked queue to the tail of */
1113       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1114       event->tso->gran.blocktime += 
1115         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1116       do_the_startthread(event);
1117       goto next_thread;             /* handle next event in event queue  */
1118       
1119     case StartThread:
1120       do_the_startthread(event);
1121       goto next_thread;             /* handle next event in event queue  */
1122       
1123     case MoveThread:
1124       do_the_movethread(event);
1125       goto next_thread;             /* handle next event in event queue  */
1126       
1127     case MoveSpark:
1128       do_the_movespark(event);
1129       goto next_thread;             /* handle next event in event queue  */
1130       
1131     case FindWork:
1132       do_the_findwork(event);
1133       goto next_thread;             /* handle next event in event queue  */
1134       
1135     default:
1136       barf("Illegal event type %u\n", event->evttype);
1137     }  /* switch */
1138     
1139     /* This point was scheduler_loop in the old RTS */
1140
1141     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1142
1143     TimeOfLastEvent = CurrentTime[CurrentProc];
1144     TimeOfNextEvent = get_time_of_next_event();
1145     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1146     // CurrentTSO = ThreadQueueHd;
1147
1148     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1149                          TimeOfNextEvent));
1150
1151     if (RtsFlags.GranFlags.Light) 
1152       GranSimLight_leave_system(event, &ActiveTSO); 
1153
1154     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1155
1156     IF_DEBUG(gran, 
1157              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1158
1159     /* in a GranSim setup the TSO stays on the run queue */
1160     t = CurrentTSO;
1161     /* Take a thread from the run queue. */
1162     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1163
1164     IF_DEBUG(gran, 
1165              debugBelch("GRAN: About to run current thread, which is\n");
1166              G_TSO(t,5));
1167
1168     context_switch = 0; // turned on via GranYield, checking events and time slice
1169
1170     IF_DEBUG(gran, 
1171              DumpGranEvent(GR_SCHEDULE, t));
1172
1173     procStatus[CurrentProc] = Busy;
1174 }
1175 #endif // GRAN
1176
1177 /* ----------------------------------------------------------------------------
1178  * Send pending messages (PARALLEL_HASKELL only)
1179  * ------------------------------------------------------------------------- */
1180
1181 #if defined(PARALLEL_HASKELL)
1182 static StgTSO *
1183 scheduleSendPendingMessages(void)
1184 {
1185     StgSparkPool *pool;
1186     rtsSpark spark;
1187     StgTSO *t;
1188
1189 # if defined(PAR) // global Mem.Mgmt., omit for now
1190     if (PendingFetches != END_BF_QUEUE) {
1191         processFetches();
1192     }
1193 # endif
1194     
1195     if (RtsFlags.ParFlags.BufferTime) {
1196         // if we use message buffering, we must send away all message
1197         // packets which have become too old...
1198         sendOldBuffers(); 
1199     }
1200 }
1201 #endif
1202
1203 /* ----------------------------------------------------------------------------
1204  * Activate spark threads (PARALLEL_HASKELL only)
1205  * ------------------------------------------------------------------------- */
1206
1207 #if defined(PARALLEL_HASKELL)
1208 static void
1209 scheduleActivateSpark(void)
1210 {
1211 #if defined(SPARKS)
1212   ASSERT(emptyRunQueue());
1213 /* We get here if the run queue is empty and want some work.
1214    We try to turn a spark into a thread, and add it to the run queue,
1215    from where it will be picked up in the next iteration of the scheduler
1216    loop.
1217 */
1218
1219       /* :-[  no local threads => look out for local sparks */
1220       /* the spark pool for the current PE */
1221       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1222       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1223           pool->hd < pool->tl) {
1224         /* 
1225          * ToDo: add GC code check that we really have enough heap afterwards!!
1226          * Old comment:
1227          * If we're here (no runnable threads) and we have pending
1228          * sparks, we must have a space problem.  Get enough space
1229          * to turn one of those pending sparks into a
1230          * thread... 
1231          */
1232
1233         spark = findSpark(rtsFalse);            /* get a spark */
1234         if (spark != (rtsSpark) NULL) {
1235           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1236           IF_PAR_DEBUG(fish, // schedule,
1237                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1238                              tso->id, tso, advisory_thread_count));
1239
1240           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1241             IF_PAR_DEBUG(fish, // schedule,
1242                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1243                             spark));
1244             return rtsFalse; /* failed to generate a thread */
1245           }                  /* otherwise fall through & pick-up new tso */
1246         } else {
1247           IF_PAR_DEBUG(fish, // schedule,
1248                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1249                              spark_queue_len(pool)));
1250           return rtsFalse;  /* failed to generate a thread */
1251         }
1252         return rtsTrue;  /* success in generating a thread */
1253   } else { /* no more threads permitted or pool empty */
1254     return rtsFalse;  /* failed to generateThread */
1255   }
1256 #else
1257   tso = NULL; // avoid compiler warning only
1258   return rtsFalse;  /* dummy in non-PAR setup */
1259 #endif // SPARKS
1260 }
1261 #endif // PARALLEL_HASKELL
1262
1263 /* ----------------------------------------------------------------------------
1264  * Get work from a remote node (PARALLEL_HASKELL only)
1265  * ------------------------------------------------------------------------- */
1266     
1267 #if defined(PARALLEL_HASKELL)
1268 static rtsBool
1269 scheduleGetRemoteWork(rtsBool *receivedFinish)
1270 {
1271   ASSERT(emptyRunQueue());
1272
1273   if (RtsFlags.ParFlags.BufferTime) {
1274         IF_PAR_DEBUG(verbose, 
1275                 debugBelch("...send all pending data,"));
1276         {
1277           nat i;
1278           for (i=1; i<=nPEs; i++)
1279             sendImmediately(i); // send all messages away immediately
1280         }
1281   }
1282 # ifndef SPARKS
1283         //++EDEN++ idle() , i.e. send all buffers, wait for work
1284         // suppress fishing in EDEN... just look for incoming messages
1285         // (blocking receive)
1286   IF_PAR_DEBUG(verbose, 
1287                debugBelch("...wait for incoming messages...\n"));
1288   *receivedFinish = processMessages(); // blocking receive...
1289
1290         // and reenter scheduling loop after having received something
1291         // (return rtsFalse below)
1292
1293 # else /* activate SPARKS machinery */
1294 /* We get here, if we have no work, tried to activate a local spark, but still
1295    have no work. We try to get a remote spark, by sending a FISH message.
1296    Thread migration should be added here, and triggered when a sequence of 
1297    fishes returns without work. */
1298         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1299
1300       /* =8-[  no local sparks => look for work on other PEs */
1301         /*
1302          * We really have absolutely no work.  Send out a fish
1303          * (there may be some out there already), and wait for
1304          * something to arrive.  We clearly can't run any threads
1305          * until a SCHEDULE or RESUME arrives, and so that's what
1306          * we're hoping to see.  (Of course, we still have to
1307          * respond to other types of messages.)
1308          */
1309         rtsTime now = msTime() /*CURRENT_TIME*/;
1310         IF_PAR_DEBUG(verbose, 
1311                      debugBelch("--  now=%ld\n", now));
1312         IF_PAR_DEBUG(fish, // verbose,
1313              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314                  (last_fish_arrived_at!=0 &&
1315                   last_fish_arrived_at+delay > now)) {
1316                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1317                      now, last_fish_arrived_at+delay, 
1318                      last_fish_arrived_at,
1319                      delay);
1320              });
1321   
1322         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1323             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1324           if (last_fish_arrived_at==0 ||
1325               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1326             /* outstandingFishes is set in sendFish, processFish;
1327                avoid flooding system with fishes via delay */
1328     next_fish_to_send_at = 0;  
1329   } else {
1330     /* ToDo: this should be done in the main scheduling loop to avoid the
1331              busy wait here; not so bad if fish delay is very small  */
1332     int iq = 0; // DEBUGGING -- HWL
1333     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1334     /* send a fish when ready, but process messages that arrive in the meantime */
1335     do {
1336       if (PacketsWaiting()) {
1337         iq++; // DEBUGGING
1338         *receivedFinish = processMessages();
1339       }
1340       now = msTime();
1341     } while (!*receivedFinish || now<next_fish_to_send_at);
1342     // JB: This means the fish could become obsolete, if we receive
1343     // work. Better check for work again? 
1344     // last line: while (!receivedFinish || !haveWork || now<...)
1345     // next line: if (receivedFinish || haveWork )
1346
1347     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1348       return rtsFalse;  // NB: this will leave scheduler loop
1349                         // immediately after return!
1350                           
1351     IF_PAR_DEBUG(fish, // verbose,
1352                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1353
1354   }
1355
1356     // JB: IMHO, this should all be hidden inside sendFish(...)
1357     /* pe = choosePE(); 
1358        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1359                 NEW_FISH_HUNGER);
1360
1361     // Global statistics: count no. of fishes
1362     if (RtsFlags.ParFlags.ParStats.Global &&
1363          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1364            globalParStats.tot_fish_mess++;
1365            }
1366     */ 
1367
1368   /* delayed fishes must have been sent by now! */
1369   next_fish_to_send_at = 0;  
1370   }
1371       
1372   *receivedFinish = processMessages();
1373 # endif /* SPARKS */
1374
1375  return rtsFalse;
1376  /* NB: this function always returns rtsFalse, meaning the scheduler
1377     loop continues with the next iteration; 
1378     rationale: 
1379       return code means success in finding work; we enter this function
1380       if there is no local work, thus have to send a fish which takes
1381       time until it arrives with work; in the meantime we should process
1382       messages in the main loop;
1383  */
1384 }
1385 #endif // PARALLEL_HASKELL
1386
1387 /* ----------------------------------------------------------------------------
1388  * PAR/GRAN: Report stats & debugging info(?)
1389  * ------------------------------------------------------------------------- */
1390
1391 #if defined(PAR) || defined(GRAN)
1392 static void
1393 scheduleGranParReport(void)
1394 {
1395   ASSERT(run_queue_hd != END_TSO_QUEUE);
1396
1397   /* Take a thread from the run queue, if we have work */
1398   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1399
1400     /* If this TSO has got its outport closed in the meantime, 
1401      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1402      * It has to be marked as TH_DEAD for this purpose.
1403      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1404
1405 JB: TODO: investigate wether state change field could be nuked
1406      entirely and replaced by the normal tso state (whatnext
1407      field). All we want to do is to kill tsos from outside.
1408      */
1409
1410     /* ToDo: write something to the log-file
1411     if (RTSflags.ParFlags.granSimStats && !sameThread)
1412         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1413
1414     CurrentTSO = t;
1415     */
1416     /* the spark pool for the current PE */
1417     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1418
1419     IF_DEBUG(scheduler, 
1420              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1421                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1422
1423     IF_PAR_DEBUG(fish,
1424              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1425                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1426
1427     if (RtsFlags.ParFlags.ParStats.Full && 
1428         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1429         (emitSchedule || // forced emit
1430          (t && LastTSO && t->id != LastTSO->id))) {
1431       /* 
1432          we are running a different TSO, so write a schedule event to log file
1433          NB: If we use fair scheduling we also have to write  a deschedule 
1434              event for LastTSO; with unfair scheduling we know that the
1435              previous tso has blocked whenever we switch to another tso, so
1436              we don't need it in GUM for now
1437       */
1438       IF_PAR_DEBUG(fish, // schedule,
1439                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1440
1441       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1442                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1443       emitSchedule = rtsFalse;
1444     }
1445 }     
1446 #endif
1447
1448 /* ----------------------------------------------------------------------------
1449  * After running a thread...
1450  * ------------------------------------------------------------------------- */
1451
1452 static void
1453 schedulePostRunThread(void)
1454 {
1455 #if defined(PAR)
1456     /* HACK 675: if the last thread didn't yield, make sure to print a 
1457        SCHEDULE event to the log file when StgRunning the next thread, even
1458        if it is the same one as before */
1459     LastTSO = t; 
1460     TimeOfLastYield = CURRENT_TIME;
1461 #endif
1462
1463   /* some statistics gathering in the parallel case */
1464
1465 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1466   switch (ret) {
1467     case HeapOverflow:
1468 # if defined(GRAN)
1469       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1470       globalGranStats.tot_heapover++;
1471 # elif defined(PAR)
1472       globalParStats.tot_heapover++;
1473 # endif
1474       break;
1475
1476      case StackOverflow:
1477 # if defined(GRAN)
1478       IF_DEBUG(gran, 
1479                DumpGranEvent(GR_DESCHEDULE, t));
1480       globalGranStats.tot_stackover++;
1481 # elif defined(PAR)
1482       // IF_DEBUG(par, 
1483       // DumpGranEvent(GR_DESCHEDULE, t);
1484       globalParStats.tot_stackover++;
1485 # endif
1486       break;
1487
1488     case ThreadYielding:
1489 # if defined(GRAN)
1490       IF_DEBUG(gran, 
1491                DumpGranEvent(GR_DESCHEDULE, t));
1492       globalGranStats.tot_yields++;
1493 # elif defined(PAR)
1494       // IF_DEBUG(par, 
1495       // DumpGranEvent(GR_DESCHEDULE, t);
1496       globalParStats.tot_yields++;
1497 # endif
1498       break; 
1499
1500     case ThreadBlocked:
1501 # if defined(GRAN)
1502         debugTrace(DEBUG_sched, 
1503                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1504                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1505                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1506                if (t->block_info.closure!=(StgClosure*)NULL)
1507                  print_bq(t->block_info.closure);
1508                debugBelch("\n"));
1509
1510       // ??? needed; should emit block before
1511       IF_DEBUG(gran, 
1512                DumpGranEvent(GR_DESCHEDULE, t)); 
1513       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1514       /*
1515         ngoq Dogh!
1516       ASSERT(procStatus[CurrentProc]==Busy || 
1517               ((procStatus[CurrentProc]==Fetching) && 
1518               (t->block_info.closure!=(StgClosure*)NULL)));
1519       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1520           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1521             procStatus[CurrentProc]==Fetching)) 
1522         procStatus[CurrentProc] = Idle;
1523       */
1524 # elif defined(PAR)
1525 //++PAR++  blockThread() writes the event (change?)
1526 # endif
1527     break;
1528
1529   case ThreadFinished:
1530     break;
1531
1532   default:
1533     barf("parGlobalStats: unknown return code");
1534     break;
1535     }
1536 #endif
1537 }
1538
1539 /* -----------------------------------------------------------------------------
1540  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1541  * -------------------------------------------------------------------------- */
1542
1543 static rtsBool
1544 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1545 {
1546     // did the task ask for a large block?
1547     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1548         // if so, get one and push it on the front of the nursery.
1549         bdescr *bd;
1550         lnat blocks;
1551         
1552         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1553         
1554         debugTrace(DEBUG_sched,
1555                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1556                    (long)t->id, whatNext_strs[t->what_next], blocks);
1557     
1558         // don't do this if the nursery is (nearly) full, we'll GC first.
1559         if (cap->r.rCurrentNursery->link != NULL ||
1560             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1561                                                // if the nursery has only one block.
1562             
1563             ACQUIRE_SM_LOCK
1564             bd = allocGroup( blocks );
1565             RELEASE_SM_LOCK
1566             cap->r.rNursery->n_blocks += blocks;
1567             
1568             // link the new group into the list
1569             bd->link = cap->r.rCurrentNursery;
1570             bd->u.back = cap->r.rCurrentNursery->u.back;
1571             if (cap->r.rCurrentNursery->u.back != NULL) {
1572                 cap->r.rCurrentNursery->u.back->link = bd;
1573             } else {
1574 #if !defined(THREADED_RTS)
1575                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1576                        g0s0 == cap->r.rNursery);
1577 #endif
1578                 cap->r.rNursery->blocks = bd;
1579             }             
1580             cap->r.rCurrentNursery->u.back = bd;
1581             
1582             // initialise it as a nursery block.  We initialise the
1583             // step, gen_no, and flags field of *every* sub-block in
1584             // this large block, because this is easier than making
1585             // sure that we always find the block head of a large
1586             // block whenever we call Bdescr() (eg. evacuate() and
1587             // isAlive() in the GC would both have to do this, at
1588             // least).
1589             { 
1590                 bdescr *x;
1591                 for (x = bd; x < bd + blocks; x++) {
1592                     x->step = cap->r.rNursery;
1593                     x->gen_no = 0;
1594                     x->flags = 0;
1595                 }
1596             }
1597             
1598             // This assert can be a killer if the app is doing lots
1599             // of large block allocations.
1600             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1601             
1602             // now update the nursery to point to the new block
1603             cap->r.rCurrentNursery = bd;
1604             
1605             // we might be unlucky and have another thread get on the
1606             // run queue before us and steal the large block, but in that
1607             // case the thread will just end up requesting another large
1608             // block.
1609             pushOnRunQueue(cap,t);
1610             return rtsFalse;  /* not actually GC'ing */
1611         }
1612     }
1613     
1614     debugTrace(DEBUG_sched,
1615                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1616                (long)t->id, whatNext_strs[t->what_next]);
1617
1618 #if defined(GRAN)
1619     ASSERT(!is_on_queue(t,CurrentProc));
1620 #elif defined(PARALLEL_HASKELL)
1621     /* Currently we emit a DESCHEDULE event before GC in GUM.
1622        ToDo: either add separate event to distinguish SYSTEM time from rest
1623        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1624     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1625         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1626                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1627         emitSchedule = rtsTrue;
1628     }
1629 #endif
1630       
1631     if (context_switch) {
1632         // Sometimes we miss a context switch, e.g. when calling
1633         // primitives in a tight loop, MAYBE_GC() doesn't check the
1634         // context switch flag, and we end up waiting for a GC.
1635         // See #1984, and concurrent/should_run/1984
1636         context_switch = 0;
1637         addToRunQueue(cap,t);
1638     } else {
1639         pushOnRunQueue(cap,t);
1640     }
1641     return rtsTrue;
1642     /* actual GC is done at the end of the while loop in schedule() */
1643 }
1644
1645 /* -----------------------------------------------------------------------------
1646  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1647  * -------------------------------------------------------------------------- */
1648
1649 static void
1650 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1651 {
1652     debugTrace (DEBUG_sched,
1653                 "--<< thread %ld (%s) stopped, StackOverflow", 
1654                 (long)t->id, whatNext_strs[t->what_next]);
1655
1656     /* just adjust the stack for this thread, then pop it back
1657      * on the run queue.
1658      */
1659     { 
1660         /* enlarge the stack */
1661         StgTSO *new_t = threadStackOverflow(cap, t);
1662         
1663         /* The TSO attached to this Task may have moved, so update the
1664          * pointer to it.
1665          */
1666         if (task->tso == t) {
1667             task->tso = new_t;
1668         }
1669         pushOnRunQueue(cap,new_t);
1670     }
1671 }
1672
1673 /* -----------------------------------------------------------------------------
1674  * Handle a thread that returned to the scheduler with ThreadYielding
1675  * -------------------------------------------------------------------------- */
1676
1677 static rtsBool
1678 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1679 {
1680     // Reset the context switch flag.  We don't do this just before
1681     // running the thread, because that would mean we would lose ticks
1682     // during GC, which can lead to unfair scheduling (a thread hogs
1683     // the CPU because the tick always arrives during GC).  This way
1684     // penalises threads that do a lot of allocation, but that seems
1685     // better than the alternative.
1686     context_switch = 0;
1687     
1688     /* put the thread back on the run queue.  Then, if we're ready to
1689      * GC, check whether this is the last task to stop.  If so, wake
1690      * up the GC thread.  getThread will block during a GC until the
1691      * GC is finished.
1692      */
1693 #ifdef DEBUG
1694     if (t->what_next != prev_what_next) {
1695         debugTrace(DEBUG_sched,
1696                    "--<< thread %ld (%s) stopped to switch evaluators", 
1697                    (long)t->id, whatNext_strs[t->what_next]);
1698     } else {
1699         debugTrace(DEBUG_sched,
1700                    "--<< thread %ld (%s) stopped, yielding",
1701                    (long)t->id, whatNext_strs[t->what_next]);
1702     }
1703 #endif
1704     
1705     IF_DEBUG(sanity,
1706              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1707              checkTSO(t));
1708     ASSERT(t->_link == END_TSO_QUEUE);
1709     
1710     // Shortcut if we're just switching evaluators: don't bother
1711     // doing stack squeezing (which can be expensive), just run the
1712     // thread.
1713     if (t->what_next != prev_what_next) {
1714         return rtsTrue;
1715     }
1716     
1717 #if defined(GRAN)
1718     ASSERT(!is_on_queue(t,CurrentProc));
1719       
1720     IF_DEBUG(sanity,
1721              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1722              checkThreadQsSanity(rtsTrue));
1723
1724 #endif
1725
1726     addToRunQueue(cap,t);
1727
1728 #if defined(GRAN)
1729     /* add a ContinueThread event to actually process the thread */
1730     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1731               ContinueThread,
1732               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1733     IF_GRAN_DEBUG(bq, 
1734                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1735                   G_EVENTQ(0);
1736                   G_CURR_THREADQ(0));
1737 #endif
1738     return rtsFalse;
1739 }
1740
1741 /* -----------------------------------------------------------------------------
1742  * Handle a thread that returned to the scheduler with ThreadBlocked
1743  * -------------------------------------------------------------------------- */
1744
1745 static void
1746 scheduleHandleThreadBlocked( StgTSO *t
1747 #if !defined(GRAN) && !defined(DEBUG)
1748     STG_UNUSED
1749 #endif
1750     )
1751 {
1752 #if defined(GRAN)
1753     IF_DEBUG(scheduler,
1754              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1755                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1756              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1757     
1758     // ??? needed; should emit block before
1759     IF_DEBUG(gran, 
1760              DumpGranEvent(GR_DESCHEDULE, t)); 
1761     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1762     /*
1763       ngoq Dogh!
1764       ASSERT(procStatus[CurrentProc]==Busy || 
1765       ((procStatus[CurrentProc]==Fetching) && 
1766       (t->block_info.closure!=(StgClosure*)NULL)));
1767       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1768       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1769       procStatus[CurrentProc]==Fetching)) 
1770       procStatus[CurrentProc] = Idle;
1771     */
1772 #elif defined(PAR)
1773     IF_DEBUG(scheduler,
1774              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1775                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1776     IF_PAR_DEBUG(bq,
1777                  
1778                  if (t->block_info.closure!=(StgClosure*)NULL) 
1779                  print_bq(t->block_info.closure));
1780     
1781     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1782     blockThread(t);
1783     
1784     /* whatever we schedule next, we must log that schedule */
1785     emitSchedule = rtsTrue;
1786     
1787 #else /* !GRAN */
1788
1789       // We don't need to do anything.  The thread is blocked, and it
1790       // has tidied up its stack and placed itself on whatever queue
1791       // it needs to be on.
1792
1793     // ASSERT(t->why_blocked != NotBlocked);
1794     // Not true: for example,
1795     //    - in THREADED_RTS, the thread may already have been woken
1796     //      up by another Capability.  This actually happens: try
1797     //      conc023 +RTS -N2.
1798     //    - the thread may have woken itself up already, because
1799     //      threadPaused() might have raised a blocked throwTo
1800     //      exception, see maybePerformBlockedException().
1801
1802 #ifdef DEBUG
1803     if (traceClass(DEBUG_sched)) {
1804         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1805                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1806         printThreadBlockage(t);
1807         debugTraceEnd();
1808     }
1809 #endif
1810     
1811     /* Only for dumping event to log file 
1812        ToDo: do I need this in GranSim, too?
1813        blockThread(t);
1814     */
1815 #endif
1816 }
1817
1818 /* -----------------------------------------------------------------------------
1819  * Handle a thread that returned to the scheduler with ThreadFinished
1820  * -------------------------------------------------------------------------- */
1821
1822 static rtsBool
1823 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1824 {
1825     /* Need to check whether this was a main thread, and if so,
1826      * return with the return value.
1827      *
1828      * We also end up here if the thread kills itself with an
1829      * uncaught exception, see Exception.cmm.
1830      */
1831     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1832                (unsigned long)t->id, whatNext_strs[t->what_next]);
1833
1834 #if defined(GRAN)
1835       endThread(t, CurrentProc); // clean-up the thread
1836 #elif defined(PARALLEL_HASKELL)
1837       /* For now all are advisory -- HWL */
1838       //if(t->priority==AdvisoryPriority) ??
1839       advisory_thread_count--; // JB: Caution with this counter, buggy!
1840       
1841 # if defined(DIST)
1842       if(t->dist.priority==RevalPriority)
1843         FinishReval(t);
1844 # endif
1845     
1846 # if defined(EDENOLD)
1847       // the thread could still have an outport... (BUG)
1848       if (t->eden.outport != -1) {
1849       // delete the outport for the tso which has finished...
1850         IF_PAR_DEBUG(eden_ports,
1851                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1852                               t->eden.outport, t->id));
1853         deleteOPT(t);
1854       }
1855       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1856       if (t->eden.epid != -1) {
1857         IF_PAR_DEBUG(eden_ports,
1858                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1859                            t->id, t->eden.epid));
1860         removeTSOfromProcess(t);
1861       }
1862 # endif 
1863
1864 # if defined(PAR)
1865       if (RtsFlags.ParFlags.ParStats.Full &&
1866           !RtsFlags.ParFlags.ParStats.Suppressed) 
1867         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1868
1869       //  t->par only contains statistics: left out for now...
1870       IF_PAR_DEBUG(fish,
1871                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1872                               t->id,t,t->par.sparkname));
1873 # endif
1874 #endif // PARALLEL_HASKELL
1875
1876       //
1877       // Check whether the thread that just completed was a bound
1878       // thread, and if so return with the result.  
1879       //
1880       // There is an assumption here that all thread completion goes
1881       // through this point; we need to make sure that if a thread
1882       // ends up in the ThreadKilled state, that it stays on the run
1883       // queue so it can be dealt with here.
1884       //
1885
1886       if (t->bound) {
1887
1888           if (t->bound != task) {
1889 #if !defined(THREADED_RTS)
1890               // Must be a bound thread that is not the topmost one.  Leave
1891               // it on the run queue until the stack has unwound to the
1892               // point where we can deal with this.  Leaving it on the run
1893               // queue also ensures that the garbage collector knows about
1894               // this thread and its return value (it gets dropped from the
1895               // step->threads list so there's no other way to find it).
1896               appendToRunQueue(cap,t);
1897               return rtsFalse;
1898 #else
1899               // this cannot happen in the threaded RTS, because a
1900               // bound thread can only be run by the appropriate Task.
1901               barf("finished bound thread that isn't mine");
1902 #endif
1903           }
1904
1905           ASSERT(task->tso == t);
1906
1907           if (t->what_next == ThreadComplete) {
1908               if (task->ret) {
1909                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1910                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1911               }
1912               task->stat = Success;
1913           } else {
1914               if (task->ret) {
1915                   *(task->ret) = NULL;
1916               }
1917               if (sched_state >= SCHED_INTERRUPTING) {
1918                   task->stat = Interrupted;
1919               } else {
1920                   task->stat = Killed;
1921               }
1922           }
1923 #ifdef DEBUG
1924           removeThreadLabel((StgWord)task->tso->id);
1925 #endif
1926           return rtsTrue; // tells schedule() to return
1927       }
1928
1929       return rtsFalse;
1930 }
1931
1932 /* -----------------------------------------------------------------------------
1933  * Perform a heap census
1934  * -------------------------------------------------------------------------- */
1935
1936 static rtsBool
1937 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1938 {
1939     // When we have +RTS -i0 and we're heap profiling, do a census at
1940     // every GC.  This lets us get repeatable runs for debugging.
1941     if (performHeapProfile ||
1942         (RtsFlags.ProfFlags.profileInterval==0 &&
1943          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1944         return rtsTrue;
1945     } else {
1946         return rtsFalse;
1947     }
1948 }
1949
1950 /* -----------------------------------------------------------------------------
1951  * Perform a garbage collection if necessary
1952  * -------------------------------------------------------------------------- */
1953
1954 static Capability *
1955 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1956 {
1957     StgTSO *t;
1958     rtsBool heap_census;
1959 #ifdef THREADED_RTS
1960     static volatile StgWord waiting_for_gc;
1961     rtsBool was_waiting;
1962     nat i;
1963 #endif
1964
1965 #ifdef THREADED_RTS
1966     // In order to GC, there must be no threads running Haskell code.
1967     // Therefore, the GC thread needs to hold *all* the capabilities,
1968     // and release them after the GC has completed.  
1969     //
1970     // This seems to be the simplest way: previous attempts involved
1971     // making all the threads with capabilities give up their
1972     // capabilities and sleep except for the *last* one, which
1973     // actually did the GC.  But it's quite hard to arrange for all
1974     // the other tasks to sleep and stay asleep.
1975     //
1976         
1977     was_waiting = cas(&waiting_for_gc, 0, 1);
1978     if (was_waiting) {
1979         do {
1980             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1981             if (cap) yieldCapability(&cap,task);
1982         } while (waiting_for_gc);
1983         return cap;  // NOTE: task->cap might have changed here
1984     }
1985
1986     for (i=0; i < n_capabilities; i++) {
1987         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1988         if (cap != &capabilities[i]) {
1989             Capability *pcap = &capabilities[i];
1990             // we better hope this task doesn't get migrated to
1991             // another Capability while we're waiting for this one.
1992             // It won't, because load balancing happens while we have
1993             // all the Capabilities, but even so it's a slightly
1994             // unsavoury invariant.
1995             task->cap = pcap;
1996             context_switch = 1;
1997             waitForReturnCapability(&pcap, task);
1998             if (pcap != &capabilities[i]) {
1999                 barf("scheduleDoGC: got the wrong capability");
2000             }
2001         }
2002     }
2003
2004     waiting_for_gc = rtsFalse;
2005 #endif
2006
2007     /* Kick any transactions which are invalid back to their
2008      * atomically frames.  When next scheduled they will try to
2009      * commit, this commit will fail and they will retry.
2010      */
2011     { 
2012         StgTSO *next;
2013         nat s;
2014
2015         for (s = 0; s < total_steps; s++) {
2016           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2017             if (t->what_next == ThreadRelocated) {
2018                 next = t->_link;
2019             } else {
2020                 next = t->global_link;
2021                 
2022                 // This is a good place to check for blocked
2023                 // exceptions.  It might be the case that a thread is
2024                 // blocked on delivering an exception to a thread that
2025                 // is also blocked - we try to ensure that this
2026                 // doesn't happen in throwTo(), but it's too hard (or
2027                 // impossible) to close all the race holes, so we
2028                 // accept that some might get through and deal with
2029                 // them here.  A GC will always happen at some point,
2030                 // even if the system is otherwise deadlocked.
2031                 maybePerformBlockedException (&capabilities[0], t);
2032
2033                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2034                     if (!stmValidateNestOfTransactions (t -> trec)) {
2035                         debugTrace(DEBUG_sched | DEBUG_stm,
2036                                    "trec %p found wasting its time", t);
2037                         
2038                         // strip the stack back to the
2039                         // ATOMICALLY_FRAME, aborting the (nested)
2040                         // transaction, and saving the stack of any
2041                         // partially-evaluated thunks on the heap.
2042                         throwToSingleThreaded_(&capabilities[0], t, 
2043                                                NULL, rtsTrue, NULL);
2044                         
2045 #ifdef REG_R1
2046                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2047 #endif
2048                     }
2049                 }
2050             }
2051           }
2052         }
2053     }
2054     
2055     // so this happens periodically:
2056     if (cap) scheduleCheckBlackHoles(cap);
2057     
2058     IF_DEBUG(scheduler, printAllThreads());
2059
2060     /*
2061      * We now have all the capabilities; if we're in an interrupting
2062      * state, then we should take the opportunity to delete all the
2063      * threads in the system.
2064      */
2065     if (sched_state >= SCHED_INTERRUPTING) {
2066         deleteAllThreads(&capabilities[0]);
2067         sched_state = SCHED_SHUTTING_DOWN;
2068     }
2069     
2070     heap_census = scheduleNeedHeapProfile(rtsTrue);
2071
2072     /* everybody back, start the GC.
2073      * Could do it in this thread, or signal a condition var
2074      * to do it in another thread.  Either way, we need to
2075      * broadcast on gc_pending_cond afterward.
2076      */
2077 #if defined(THREADED_RTS)
2078     debugTrace(DEBUG_sched, "doing GC");
2079 #endif
2080     GarbageCollect(force_major || heap_census);
2081     
2082     if (heap_census) {
2083         debugTrace(DEBUG_sched, "performing heap census");
2084         heapCensus();
2085         performHeapProfile = rtsFalse;
2086     }
2087
2088 #if defined(THREADED_RTS)
2089     // release our stash of capabilities.
2090     for (i = 0; i < n_capabilities; i++) {
2091         if (cap != &capabilities[i]) {
2092             task->cap = &capabilities[i];
2093             releaseCapability(&capabilities[i]);
2094         }
2095     }
2096     if (cap) {
2097         task->cap = cap;
2098     } else {
2099         task->cap = NULL;
2100     }
2101 #endif
2102
2103 #if defined(GRAN)
2104     /* add a ContinueThread event to continue execution of current thread */
2105     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2106               ContinueThread,
2107               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2108     IF_GRAN_DEBUG(bq, 
2109                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2110                   G_EVENTQ(0);
2111                   G_CURR_THREADQ(0));
2112 #endif /* GRAN */
2113
2114     return cap;
2115 }
2116
2117 /* ---------------------------------------------------------------------------
2118  * Singleton fork(). Do not copy any running threads.
2119  * ------------------------------------------------------------------------- */
2120
2121 pid_t
2122 forkProcess(HsStablePtr *entry
2123 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2124             STG_UNUSED
2125 #endif
2126            )
2127 {
2128 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2129     Task *task;
2130     pid_t pid;
2131     StgTSO* t,*next;
2132     Capability *cap;
2133     nat s;
2134     
2135 #if defined(THREADED_RTS)
2136     if (RtsFlags.ParFlags.nNodes > 1) {
2137         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2138         stg_exit(EXIT_FAILURE);
2139     }
2140 #endif
2141
2142     debugTrace(DEBUG_sched, "forking!");
2143     
2144     // ToDo: for SMP, we should probably acquire *all* the capabilities
2145     cap = rts_lock();
2146     
2147     // no funny business: hold locks while we fork, otherwise if some
2148     // other thread is holding a lock when the fork happens, the data
2149     // structure protected by the lock will forever be in an
2150     // inconsistent state in the child.  See also #1391.
2151     ACQUIRE_LOCK(&sched_mutex);
2152     ACQUIRE_LOCK(&cap->lock);
2153     ACQUIRE_LOCK(&cap->running_task->lock);
2154
2155     pid = fork();
2156     
2157     if (pid) { // parent
2158         
2159         RELEASE_LOCK(&sched_mutex);
2160         RELEASE_LOCK(&cap->lock);
2161         RELEASE_LOCK(&cap->running_task->lock);
2162
2163         // just return the pid
2164         rts_unlock(cap);
2165         return pid;
2166         
2167     } else { // child
2168         
2169 #if defined(THREADED_RTS)
2170         initMutex(&sched_mutex);
2171         initMutex(&cap->lock);
2172         initMutex(&cap->running_task->lock);
2173 #endif
2174
2175         // Now, all OS threads except the thread that forked are
2176         // stopped.  We need to stop all Haskell threads, including
2177         // those involved in foreign calls.  Also we need to delete
2178         // all Tasks, because they correspond to OS threads that are
2179         // now gone.
2180
2181         for (s = 0; s < total_steps; s++) {
2182           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2183             if (t->what_next == ThreadRelocated) {
2184                 next = t->_link;
2185             } else {
2186                 next = t->global_link;
2187                 // don't allow threads to catch the ThreadKilled
2188                 // exception, but we do want to raiseAsync() because these
2189                 // threads may be evaluating thunks that we need later.
2190                 deleteThread_(cap,t);
2191             }
2192           }
2193         }
2194         
2195         // Empty the run queue.  It seems tempting to let all the
2196         // killed threads stay on the run queue as zombies to be
2197         // cleaned up later, but some of them correspond to bound
2198         // threads for which the corresponding Task does not exist.
2199         cap->run_queue_hd = END_TSO_QUEUE;
2200         cap->run_queue_tl = END_TSO_QUEUE;
2201
2202         // Any suspended C-calling Tasks are no more, their OS threads
2203         // don't exist now:
2204         cap->suspended_ccalling_tasks = NULL;
2205
2206         // Empty the threads lists.  Otherwise, the garbage
2207         // collector may attempt to resurrect some of these threads.
2208         for (s = 0; s < total_steps; s++) {
2209             all_steps[s].threads = END_TSO_QUEUE;
2210         }
2211
2212         // Wipe the task list, except the current Task.
2213         ACQUIRE_LOCK(&sched_mutex);
2214         for (task = all_tasks; task != NULL; task=task->all_link) {
2215             if (task != cap->running_task) {
2216 #if defined(THREADED_RTS)
2217                 initMutex(&task->lock); // see #1391
2218 #endif
2219                 discardTask(task);
2220             }
2221         }
2222         RELEASE_LOCK(&sched_mutex);
2223
2224 #if defined(THREADED_RTS)
2225         // Wipe our spare workers list, they no longer exist.  New
2226         // workers will be created if necessary.
2227         cap->spare_workers = NULL;
2228         cap->returning_tasks_hd = NULL;
2229         cap->returning_tasks_tl = NULL;
2230 #endif
2231
2232         // On Unix, all timers are reset in the child, so we need to start
2233         // the timer again.
2234         initTimer();
2235         startTimer();
2236
2237         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2238         rts_checkSchedStatus("forkProcess",cap);
2239         
2240         rts_unlock(cap);
2241         hs_exit();                      // clean up and exit
2242         stg_exit(EXIT_SUCCESS);
2243     }
2244 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2245     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2246     return -1;
2247 #endif
2248 }
2249
2250 /* ---------------------------------------------------------------------------
2251  * Delete all the threads in the system
2252  * ------------------------------------------------------------------------- */
2253    
2254 static void
2255 deleteAllThreads ( Capability *cap )
2256 {
2257     // NOTE: only safe to call if we own all capabilities.
2258
2259     StgTSO* t, *next;
2260     nat s;
2261
2262     debugTrace(DEBUG_sched,"deleting all threads");
2263     for (s = 0; s < total_steps; s++) {
2264       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2265         if (t->what_next == ThreadRelocated) {
2266             next = t->_link;
2267         } else {
2268             next = t->global_link;
2269             deleteThread(cap,t);
2270         }
2271       }
2272     }      
2273
2274     // The run queue now contains a bunch of ThreadKilled threads.  We
2275     // must not throw these away: the main thread(s) will be in there
2276     // somewhere, and the main scheduler loop has to deal with it.
2277     // Also, the run queue is the only thing keeping these threads from
2278     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2279
2280 #if !defined(THREADED_RTS)
2281     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2282     ASSERT(sleeping_queue == END_TSO_QUEUE);
2283 #endif
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287    Managing the suspended_ccalling_tasks list.
2288    Locks required: sched_mutex
2289    -------------------------------------------------------------------------- */
2290
2291 STATIC_INLINE void
2292 suspendTask (Capability *cap, Task *task)
2293 {
2294     ASSERT(task->next == NULL && task->prev == NULL);
2295     task->next = cap->suspended_ccalling_tasks;
2296     task->prev = NULL;
2297     if (cap->suspended_ccalling_tasks) {
2298         cap->suspended_ccalling_tasks->prev = task;
2299     }
2300     cap->suspended_ccalling_tasks = task;
2301 }
2302
2303 STATIC_INLINE void
2304 recoverSuspendedTask (Capability *cap, Task *task)
2305 {
2306     if (task->prev) {
2307         task->prev->next = task->next;
2308     } else {
2309         ASSERT(cap->suspended_ccalling_tasks == task);
2310         cap->suspended_ccalling_tasks = task->next;
2311     }
2312     if (task->next) {
2313         task->next->prev = task->prev;
2314     }
2315     task->next = task->prev = NULL;
2316 }
2317
2318 /* ---------------------------------------------------------------------------
2319  * Suspending & resuming Haskell threads.
2320  * 
2321  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2322  * its capability before calling the C function.  This allows another
2323  * task to pick up the capability and carry on running Haskell
2324  * threads.  It also means that if the C call blocks, it won't lock
2325  * the whole system.
2326  *
2327  * The Haskell thread making the C call is put to sleep for the
2328  * duration of the call, on the susepended_ccalling_threads queue.  We
2329  * give out a token to the task, which it can use to resume the thread
2330  * on return from the C function.
2331  * ------------------------------------------------------------------------- */
2332    
2333 void *
2334 suspendThread (StgRegTable *reg)
2335 {
2336   Capability *cap;
2337   int saved_errno;
2338   StgTSO *tso;
2339   Task *task;
2340 #if mingw32_HOST_OS
2341   StgWord32 saved_winerror;
2342 #endif
2343
2344   saved_errno = errno;
2345 #if mingw32_HOST_OS
2346   saved_winerror = GetLastError();
2347 #endif
2348
2349   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2350    */
2351   cap = regTableToCapability(reg);
2352
2353   task = cap->running_task;
2354   tso = cap->r.rCurrentTSO;
2355
2356   debugTrace(DEBUG_sched, 
2357              "thread %lu did a safe foreign call", 
2358              (unsigned long)cap->r.rCurrentTSO->id);
2359
2360   // XXX this might not be necessary --SDM
2361   tso->what_next = ThreadRunGHC;
2362
2363   threadPaused(cap,tso);
2364
2365   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2366       tso->why_blocked = BlockedOnCCall;
2367       tso->flags |= TSO_BLOCKEX;
2368       tso->flags &= ~TSO_INTERRUPTIBLE;
2369   } else {
2370       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2371   }
2372
2373   // Hand back capability
2374   task->suspended_tso = tso;
2375
2376   ACQUIRE_LOCK(&cap->lock);
2377
2378   suspendTask(cap,task);
2379   cap->in_haskell = rtsFalse;
2380   releaseCapability_(cap);
2381   
2382   RELEASE_LOCK(&cap->lock);
2383
2384 #if defined(THREADED_RTS)
2385   /* Preparing to leave the RTS, so ensure there's a native thread/task
2386      waiting to take over.
2387   */
2388   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2389 #endif
2390
2391   errno = saved_errno;
2392 #if mingw32_HOST_OS
2393   SetLastError(saved_winerror);
2394 #endif
2395   return task;
2396 }
2397
2398 StgRegTable *
2399 resumeThread (void *task_)
2400 {
2401     StgTSO *tso;
2402     Capability *cap;
2403     Task *task = task_;
2404     int saved_errno;
2405 #if mingw32_HOST_OS
2406     StgWord32 saved_winerror;
2407 #endif
2408
2409     saved_errno = errno;
2410 #if mingw32_HOST_OS
2411     saved_winerror = GetLastError();
2412 #endif
2413
2414     cap = task->cap;
2415     // Wait for permission to re-enter the RTS with the result.
2416     waitForReturnCapability(&cap,task);
2417     // we might be on a different capability now... but if so, our
2418     // entry on the suspended_ccalling_tasks list will also have been
2419     // migrated.
2420
2421     // Remove the thread from the suspended list
2422     recoverSuspendedTask(cap,task);
2423
2424     tso = task->suspended_tso;
2425     task->suspended_tso = NULL;
2426     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2427     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2428     
2429     if (tso->why_blocked == BlockedOnCCall) {
2430         awakenBlockedExceptionQueue(cap,tso);
2431         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2432     }
2433     
2434     /* Reset blocking status */
2435     tso->why_blocked  = NotBlocked;
2436     
2437     cap->r.rCurrentTSO = tso;
2438     cap->in_haskell = rtsTrue;
2439     errno = saved_errno;
2440 #if mingw32_HOST_OS
2441     SetLastError(saved_winerror);
2442 #endif
2443
2444     /* We might have GC'd, mark the TSO dirty again */
2445     dirty_TSO(cap,tso);
2446
2447     IF_DEBUG(sanity, checkTSO(tso));
2448
2449     return &cap->r;
2450 }
2451
2452 /* ---------------------------------------------------------------------------
2453  * scheduleThread()
2454  *
2455  * scheduleThread puts a thread on the end  of the runnable queue.
2456  * This will usually be done immediately after a thread is created.
2457  * The caller of scheduleThread must create the thread using e.g.
2458  * createThread and push an appropriate closure
2459  * on this thread's stack before the scheduler is invoked.
2460  * ------------------------------------------------------------------------ */
2461
2462 void
2463 scheduleThread(Capability *cap, StgTSO *tso)
2464 {
2465     // The thread goes at the *end* of the run-queue, to avoid possible
2466     // starvation of any threads already on the queue.
2467     appendToRunQueue(cap,tso);
2468 }
2469
2470 void
2471 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2472 {
2473 #if defined(THREADED_RTS)
2474     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2475                               // move this thread from now on.
2476     cpu %= RtsFlags.ParFlags.nNodes;
2477     if (cpu == cap->no) {
2478         appendToRunQueue(cap,tso);
2479     } else {
2480         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2481     }
2482 #else
2483     appendToRunQueue(cap,tso);
2484 #endif
2485 }
2486
2487 Capability *
2488 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2489 {
2490     Task *task;
2491
2492     // We already created/initialised the Task
2493     task = cap->running_task;
2494
2495     // This TSO is now a bound thread; make the Task and TSO
2496     // point to each other.
2497     tso->bound = task;
2498     tso->cap = cap;
2499
2500     task->tso = tso;
2501     task->ret = ret;
2502     task->stat = NoStatus;
2503
2504     appendToRunQueue(cap,tso);
2505
2506     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2507
2508 #if defined(GRAN)
2509     /* GranSim specific init */
2510     CurrentTSO = m->tso;                // the TSO to run
2511     procStatus[MainProc] = Busy;        // status of main PE
2512     CurrentProc = MainProc;             // PE to run it on
2513 #endif
2514
2515     cap = schedule(cap,task);
2516
2517     ASSERT(task->stat != NoStatus);
2518     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2519
2520     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2521     return cap;
2522 }
2523
2524 /* ----------------------------------------------------------------------------
2525  * Starting Tasks
2526  * ------------------------------------------------------------------------- */
2527
2528 #if defined(THREADED_RTS)
2529 void
2530 workerStart(Task *task)
2531 {
2532     Capability *cap;
2533
2534     // See startWorkerTask().
2535     ACQUIRE_LOCK(&task->lock);
2536     cap = task->cap;
2537     RELEASE_LOCK(&task->lock);
2538
2539     // set the thread-local pointer to the Task:
2540     taskEnter(task);
2541
2542     // schedule() runs without a lock.
2543     cap = schedule(cap,task);
2544
2545     // On exit from schedule(), we have a Capability.
2546     releaseCapability(cap);
2547     workerTaskStop(task);
2548 }
2549 #endif
2550
2551 /* ---------------------------------------------------------------------------
2552  * initScheduler()
2553  *
2554  * Initialise the scheduler.  This resets all the queues - if the
2555  * queues contained any threads, they'll be garbage collected at the
2556  * next pass.
2557  *
2558  * ------------------------------------------------------------------------ */
2559
2560 void 
2561 initScheduler(void)
2562 {
2563 #if defined(GRAN)
2564   nat i;
2565   for (i=0; i<=MAX_PROC; i++) {
2566     run_queue_hds[i]      = END_TSO_QUEUE;
2567     run_queue_tls[i]      = END_TSO_QUEUE;
2568     blocked_queue_hds[i]  = END_TSO_QUEUE;
2569     blocked_queue_tls[i]  = END_TSO_QUEUE;
2570     ccalling_threadss[i]  = END_TSO_QUEUE;
2571     blackhole_queue[i]    = END_TSO_QUEUE;
2572     sleeping_queue        = END_TSO_QUEUE;
2573   }
2574 #elif !defined(THREADED_RTS)
2575   blocked_queue_hd  = END_TSO_QUEUE;
2576   blocked_queue_tl  = END_TSO_QUEUE;
2577   sleeping_queue    = END_TSO_QUEUE;
2578 #endif
2579
2580   blackhole_queue   = END_TSO_QUEUE;
2581
2582   context_switch = 0;
2583   sched_state    = SCHED_RUNNING;
2584   recent_activity = ACTIVITY_YES;
2585
2586 #if defined(THREADED_RTS)
2587   /* Initialise the mutex and condition variables used by
2588    * the scheduler. */
2589   initMutex(&sched_mutex);
2590 #endif
2591   
2592   ACQUIRE_LOCK(&sched_mutex);
2593
2594   /* A capability holds the state a native thread needs in
2595    * order to execute STG code. At least one capability is
2596    * floating around (only THREADED_RTS builds have more than one).
2597    */
2598   initCapabilities();
2599
2600   initTaskManager();
2601
2602 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2603   initSparkPools();
2604 #endif
2605
2606 #if defined(THREADED_RTS)
2607   /*
2608    * Eagerly start one worker to run each Capability, except for
2609    * Capability 0.  The idea is that we're probably going to start a
2610    * bound thread on Capability 0 pretty soon, so we don't want a
2611    * worker task hogging it.
2612    */
2613   { 
2614       nat i;
2615       Capability *cap;
2616       for (i = 1; i < n_capabilities; i++) {
2617           cap = &capabilities[i];
2618           ACQUIRE_LOCK(&cap->lock);
2619           startWorkerTask(cap, workerStart);
2620           RELEASE_LOCK(&cap->lock);
2621       }
2622   }
2623 #endif
2624
2625   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2626
2627   RELEASE_LOCK(&sched_mutex);
2628 }
2629
2630 void
2631 exitScheduler(
2632     rtsBool wait_foreign
2633 #if !defined(THREADED_RTS)
2634                          __attribute__((unused))
2635 #endif
2636 )
2637                /* see Capability.c, shutdownCapability() */
2638 {
2639     Task *task = NULL;
2640
2641 #if defined(THREADED_RTS)
2642     ACQUIRE_LOCK(&sched_mutex);
2643     task = newBoundTask();
2644     RELEASE_LOCK(&sched_mutex);
2645 #endif
2646
2647     // If we haven't killed all the threads yet, do it now.
2648     if (sched_state < SCHED_SHUTTING_DOWN) {
2649         sched_state = SCHED_INTERRUPTING;
2650         scheduleDoGC(NULL,task,rtsFalse);    
2651     }
2652     sched_state = SCHED_SHUTTING_DOWN;
2653
2654 #if defined(THREADED_RTS)
2655     { 
2656         nat i;
2657         
2658         for (i = 0; i < n_capabilities; i++) {
2659             shutdownCapability(&capabilities[i], task, wait_foreign);
2660         }
2661         boundTaskExiting(task);
2662         stopTaskManager();
2663     }
2664 #else
2665     freeCapability(&MainCapability);
2666 #endif
2667 }
2668
2669 void
2670 freeScheduler( void )
2671 {
2672     freeTaskManager();
2673     if (n_capabilities != 1) {
2674         stgFree(capabilities);
2675     }
2676 #if defined(THREADED_RTS)
2677     closeMutex(&sched_mutex);
2678 #endif
2679 }
2680
2681 /* -----------------------------------------------------------------------------
2682    performGC
2683
2684    This is the interface to the garbage collector from Haskell land.
2685    We provide this so that external C code can allocate and garbage
2686    collect when called from Haskell via _ccall_GC.
2687    -------------------------------------------------------------------------- */
2688
2689 static void
2690 performGC_(rtsBool force_major)
2691 {
2692     Task *task;
2693     // We must grab a new Task here, because the existing Task may be
2694     // associated with a particular Capability, and chained onto the 
2695     // suspended_ccalling_tasks queue.
2696     ACQUIRE_LOCK(&sched_mutex);
2697     task = newBoundTask();
2698     RELEASE_LOCK(&sched_mutex);
2699     scheduleDoGC(NULL,task,force_major);
2700     boundTaskExiting(task);
2701 }
2702
2703 void
2704 performGC(void)
2705 {
2706     performGC_(rtsFalse);
2707 }
2708
2709 void
2710 performMajorGC(void)
2711 {
2712     performGC_(rtsTrue);
2713 }
2714
2715 /* -----------------------------------------------------------------------------
2716    Stack overflow
2717
2718    If the thread has reached its maximum stack size, then raise the
2719    StackOverflow exception in the offending thread.  Otherwise
2720    relocate the TSO into a larger chunk of memory and adjust its stack
2721    size appropriately.
2722    -------------------------------------------------------------------------- */
2723
2724 static StgTSO *
2725 threadStackOverflow(Capability *cap, StgTSO *tso)
2726 {
2727   nat new_stack_size, stack_words;
2728   lnat new_tso_size;
2729   StgPtr new_sp;
2730   StgTSO *dest;
2731
2732   IF_DEBUG(sanity,checkTSO(tso));
2733
2734   // don't allow throwTo() to modify the blocked_exceptions queue
2735   // while we are moving the TSO:
2736   lockClosure((StgClosure *)tso);
2737
2738   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2739       // NB. never raise a StackOverflow exception if the thread is
2740       // inside Control.Exceptino.block.  It is impractical to protect
2741       // against stack overflow exceptions, since virtually anything
2742       // can raise one (even 'catch'), so this is the only sensible
2743       // thing to do here.  See bug #767.
2744
2745       debugTrace(DEBUG_gc,
2746                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2747                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2748       IF_DEBUG(gc,
2749                /* If we're debugging, just print out the top of the stack */
2750                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2751                                                 tso->sp+64)));
2752
2753       // Send this thread the StackOverflow exception
2754       unlockTSO(tso);
2755       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2756       return tso;
2757   }
2758
2759   /* Try to double the current stack size.  If that takes us over the
2760    * maximum stack size for this thread, then use the maximum instead.
2761    * Finally round up so the TSO ends up as a whole number of blocks.
2762    */
2763   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2764   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2765                                        TSO_STRUCT_SIZE)/sizeof(W_);
2766   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2767   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2768
2769   debugTrace(DEBUG_sched, 
2770              "increasing stack size from %ld words to %d.",
2771              (long)tso->stack_size, new_stack_size);
2772
2773   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2774   TICK_ALLOC_TSO(new_stack_size,0);
2775
2776   /* copy the TSO block and the old stack into the new area */
2777   memcpy(dest,tso,TSO_STRUCT_SIZE);
2778   stack_words = tso->stack + tso->stack_size - tso->sp;
2779   new_sp = (P_)dest + new_tso_size - stack_words;
2780   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2781
2782   /* relocate the stack pointers... */
2783   dest->sp         = new_sp;
2784   dest->stack_size = new_stack_size;
2785         
2786   /* Mark the old TSO as relocated.  We have to check for relocated
2787    * TSOs in the garbage collector and any primops that deal with TSOs.
2788    *
2789    * It's important to set the sp value to just beyond the end
2790    * of the stack, so we don't attempt to scavenge any part of the
2791    * dead TSO's stack.
2792    */
2793   tso->what_next = ThreadRelocated;
2794   setTSOLink(cap,tso,dest);
2795   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2796   tso->why_blocked = NotBlocked;
2797
2798   IF_PAR_DEBUG(verbose,
2799                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2800                      tso->id, tso, tso->stack_size);
2801                /* If we're debugging, just print out the top of the stack */
2802                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2803                                                 tso->sp+64)));
2804   
2805   unlockTSO(dest);
2806   unlockTSO(tso);
2807
2808   IF_DEBUG(sanity,checkTSO(dest));
2809 #if 0
2810   IF_DEBUG(scheduler,printTSO(dest));
2811 #endif
2812
2813   return dest;
2814 }
2815
2816 static StgTSO *
2817 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2818 {
2819     bdescr *bd, *new_bd;
2820     lnat new_tso_size_w, tso_size_w;
2821     StgTSO *new_tso;
2822
2823     tso_size_w = tso_sizeW(tso);
2824
2825     if (tso_size_w < MBLOCK_SIZE_W || 
2826         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2827     {
2828         return tso;
2829     }
2830
2831     // don't allow throwTo() to modify the blocked_exceptions queue
2832     // while we are moving the TSO:
2833     lockClosure((StgClosure *)tso);
2834
2835     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2836
2837     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2838                tso->id, tso_size_w, new_tso_size_w);
2839
2840     bd = Bdescr((StgPtr)tso);
2841     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2842
2843     new_tso = (StgTSO *)new_bd->start;
2844     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2845     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2846
2847     tso->what_next = ThreadRelocated;
2848     tso->_link = new_tso; // no write barrier reqd: same generation
2849
2850     // The TSO attached to this Task may have moved, so update the
2851     // pointer to it.
2852     if (task->tso == tso) {
2853         task->tso = new_tso;
2854     }
2855
2856     unlockTSO(new_tso);
2857     unlockTSO(tso);
2858
2859     IF_DEBUG(sanity,checkTSO(new_tso));
2860
2861     return new_tso;
2862 }
2863
2864 /* ---------------------------------------------------------------------------
2865    Interrupt execution
2866    - usually called inside a signal handler so it mustn't do anything fancy.   
2867    ------------------------------------------------------------------------ */
2868
2869 void
2870 interruptStgRts(void)
2871 {
2872     sched_state = SCHED_INTERRUPTING;
2873     context_switch = 1;
2874     wakeUpRts();
2875 }
2876
2877 /* -----------------------------------------------------------------------------
2878    Wake up the RTS
2879    
2880    This function causes at least one OS thread to wake up and run the
2881    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2882    an external event has arrived that may need servicing (eg. a
2883    keyboard interrupt).
2884
2885    In the single-threaded RTS we don't do anything here; we only have
2886    one thread anyway, and the event that caused us to want to wake up
2887    will have interrupted any blocking system call in progress anyway.
2888    -------------------------------------------------------------------------- */
2889
2890 void
2891 wakeUpRts(void)
2892 {
2893 #if defined(THREADED_RTS)
2894     // This forces the IO Manager thread to wakeup, which will
2895     // in turn ensure that some OS thread wakes up and runs the
2896     // scheduler loop, which will cause a GC and deadlock check.
2897     ioManagerWakeup();
2898 #endif
2899 }
2900
2901 /* -----------------------------------------------------------------------------
2902  * checkBlackHoles()
2903  *
2904  * Check the blackhole_queue for threads that can be woken up.  We do
2905  * this periodically: before every GC, and whenever the run queue is
2906  * empty.
2907  *
2908  * An elegant solution might be to just wake up all the blocked
2909  * threads with awakenBlockedQueue occasionally: they'll go back to
2910  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2911  * doesn't give us a way to tell whether we've actually managed to
2912  * wake up any threads, so we would be busy-waiting.
2913  *
2914  * -------------------------------------------------------------------------- */
2915
2916 static rtsBool
2917 checkBlackHoles (Capability *cap)
2918 {
2919     StgTSO **prev, *t;
2920     rtsBool any_woke_up = rtsFalse;
2921     StgHalfWord type;
2922
2923     // blackhole_queue is global:
2924     ASSERT_LOCK_HELD(&sched_mutex);
2925
2926     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2927
2928     // ASSUMES: sched_mutex
2929     prev = &blackhole_queue;
2930     t = blackhole_queue;
2931     while (t != END_TSO_QUEUE) {
2932         ASSERT(t->why_blocked == BlockedOnBlackHole);
2933         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2934         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2935             IF_DEBUG(sanity,checkTSO(t));
2936             t = unblockOne(cap, t);
2937             // urk, the threads migrate to the current capability
2938             // here, but we'd like to keep them on the original one.
2939             *prev = t;
2940             any_woke_up = rtsTrue;
2941         } else {
2942             prev = &t->_link;
2943             t = t->_link;
2944         }
2945     }
2946
2947     return any_woke_up;
2948 }
2949
2950 /* -----------------------------------------------------------------------------
2951    Deleting threads
2952
2953    This is used for interruption (^C) and forking, and corresponds to
2954    raising an exception but without letting the thread catch the
2955    exception.
2956    -------------------------------------------------------------------------- */
2957
2958 static void 
2959 deleteThread (Capability *cap, StgTSO *tso)
2960 {
2961     // NOTE: must only be called on a TSO that we have exclusive
2962     // access to, because we will call throwToSingleThreaded() below.
2963     // The TSO must be on the run queue of the Capability we own, or 
2964     // we must own all Capabilities.
2965
2966     if (tso->why_blocked != BlockedOnCCall &&
2967         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2968         throwToSingleThreaded(cap,tso,NULL);
2969     }
2970 }
2971
2972 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2973 static void 
2974 deleteThread_(Capability *cap, StgTSO *tso)
2975 { // for forkProcess only:
2976   // like deleteThread(), but we delete threads in foreign calls, too.
2977
2978     if (tso->why_blocked == BlockedOnCCall ||
2979         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2980         unblockOne(cap,tso);
2981         tso->what_next = ThreadKilled;
2982     } else {
2983         deleteThread(cap,tso);
2984     }
2985 }
2986 #endif
2987
2988 /* -----------------------------------------------------------------------------
2989    raiseExceptionHelper
2990    
2991    This function is called by the raise# primitve, just so that we can
2992    move some of the tricky bits of raising an exception from C-- into
2993    C.  Who knows, it might be a useful re-useable thing here too.
2994    -------------------------------------------------------------------------- */
2995
2996 StgWord
2997 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2998 {
2999     Capability *cap = regTableToCapability(reg);
3000     StgThunk *raise_closure = NULL;
3001     StgPtr p, next;
3002     StgRetInfoTable *info;
3003     //
3004     // This closure represents the expression 'raise# E' where E
3005     // is the exception raise.  It is used to overwrite all the
3006     // thunks which are currently under evaluataion.
3007     //
3008
3009     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3010     // LDV profiling: stg_raise_info has THUNK as its closure
3011     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3012     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3013     // 1 does not cause any problem unless profiling is performed.
3014     // However, when LDV profiling goes on, we need to linearly scan
3015     // small object pool, where raise_closure is stored, so we should
3016     // use MIN_UPD_SIZE.
3017     //
3018     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3019     //                                 sizeofW(StgClosure)+1);
3020     //
3021
3022     //
3023     // Walk up the stack, looking for the catch frame.  On the way,
3024     // we update any closures pointed to from update frames with the
3025     // raise closure that we just built.
3026     //
3027     p = tso->sp;
3028     while(1) {
3029         info = get_ret_itbl((StgClosure *)p);
3030         next = p + stack_frame_sizeW((StgClosure *)p);
3031         switch (info->i.type) {
3032             
3033         case UPDATE_FRAME:
3034             // Only create raise_closure if we need to.
3035             if (raise_closure == NULL) {
3036                 raise_closure = 
3037                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3038                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3039                 raise_closure->payload[0] = exception;
3040             }
3041             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3042             p = next;
3043             continue;
3044
3045         case ATOMICALLY_FRAME:
3046             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3047             tso->sp = p;
3048             return ATOMICALLY_FRAME;
3049             
3050         case CATCH_FRAME:
3051             tso->sp = p;
3052             return CATCH_FRAME;
3053
3054         case CATCH_STM_FRAME:
3055             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3056             tso->sp = p;
3057             return CATCH_STM_FRAME;
3058             
3059         case STOP_FRAME:
3060             tso->sp = p;
3061             return STOP_FRAME;
3062
3063         case CATCH_RETRY_FRAME:
3064         default:
3065             p = next; 
3066             continue;
3067         }
3068     }
3069 }
3070
3071
3072 /* -----------------------------------------------------------------------------
3073    findRetryFrameHelper
3074
3075    This function is called by the retry# primitive.  It traverses the stack
3076    leaving tso->sp referring to the frame which should handle the retry.  
3077
3078    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3079    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3080
3081    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3082    create) because retries are not considered to be exceptions, despite the
3083    similar implementation.
3084
3085    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3086    not be created within memory transactions.
3087    -------------------------------------------------------------------------- */
3088
3089 StgWord
3090 findRetryFrameHelper (StgTSO *tso)
3091 {
3092   StgPtr           p, next;
3093   StgRetInfoTable *info;
3094
3095   p = tso -> sp;
3096   while (1) {
3097     info = get_ret_itbl((StgClosure *)p);
3098     next = p + stack_frame_sizeW((StgClosure *)p);
3099     switch (info->i.type) {
3100       
3101     case ATOMICALLY_FRAME:
3102         debugTrace(DEBUG_stm,
3103                    "found ATOMICALLY_FRAME at %p during retry", p);
3104         tso->sp = p;
3105         return ATOMICALLY_FRAME;
3106       
3107     case CATCH_RETRY_FRAME:
3108         debugTrace(DEBUG_stm,
3109                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3110         tso->sp = p;
3111         return CATCH_RETRY_FRAME;
3112       
3113     case CATCH_STM_FRAME: {
3114         StgTRecHeader *trec = tso -> trec;
3115         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3116         debugTrace(DEBUG_stm,
3117                    "found CATCH_STM_FRAME at %p during retry", p);
3118         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3119         stmAbortTransaction(tso -> cap, trec);
3120         stmFreeAbortedTRec(tso -> cap, trec);
3121         tso -> trec = outer;
3122         p = next; 
3123         continue;
3124     }
3125       
3126
3127     default:
3128       ASSERT(info->i.type != CATCH_FRAME);
3129       ASSERT(info->i.type != STOP_FRAME);
3130       p = next; 
3131       continue;
3132     }
3133   }
3134 }
3135
3136 /* -----------------------------------------------------------------------------
3137    resurrectThreads is called after garbage collection on the list of
3138    threads found to be garbage.  Each of these threads will be woken
3139    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3140    on an MVar, or NonTermination if the thread was blocked on a Black
3141    Hole.
3142
3143    Locks: assumes we hold *all* the capabilities.
3144    -------------------------------------------------------------------------- */
3145
3146 void
3147 resurrectThreads (StgTSO *threads)
3148 {
3149     StgTSO *tso, *next;
3150     Capability *cap;
3151     step *step;
3152
3153     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3154         next = tso->global_link;
3155
3156         step = Bdescr((P_)tso)->step;
3157         tso->global_link = step->threads;
3158         step->threads = tso;
3159
3160         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3161         
3162         // Wake up the thread on the Capability it was last on
3163         cap = tso->cap;
3164         
3165         switch (tso->why_blocked) {
3166         case BlockedOnMVar:
3167         case BlockedOnException:
3168             /* Called by GC - sched_mutex lock is currently held. */
3169             throwToSingleThreaded(cap, tso,
3170                                   (StgClosure *)BlockedOnDeadMVar_closure);
3171             break;
3172         case BlockedOnBlackHole:
3173             throwToSingleThreaded(cap, tso,
3174                                   (StgClosure *)NonTermination_closure);
3175             break;
3176         case BlockedOnSTM:
3177             throwToSingleThreaded(cap, tso,
3178                                   (StgClosure *)BlockedIndefinitely_closure);
3179             break;
3180         case NotBlocked:
3181             /* This might happen if the thread was blocked on a black hole
3182              * belonging to a thread that we've just woken up (raiseAsync
3183              * can wake up threads, remember...).
3184              */
3185             continue;
3186         default:
3187             barf("resurrectThreads: thread blocked in a strange way");
3188         }
3189     }
3190 }