[project @ 2005-11-02 12:26:21 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 #ifdef THREADED_RTS
73 #define USED_WHEN_THREADED_RTS
74 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
75 #else
76 #define USED_WHEN_THREADED_RTS     STG_UNUSED
77 #define USED_WHEN_NON_THREADED_RTS
78 #endif
79
80 #ifdef SMP
81 #define USED_WHEN_SMP
82 #else
83 #define USED_WHEN_SMP STG_UNUSED
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Global variables
88  * -------------------------------------------------------------------------- */
89
90 #if defined(GRAN)
91
92 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
93 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
94
95 /* 
96    In GranSim we have a runnable and a blocked queue for each processor.
97    In order to minimise code changes new arrays run_queue_hds/tls
98    are created. run_queue_hd is then a short cut (macro) for
99    run_queue_hds[CurrentProc] (see GranSim.h).
100    -- HWL
101 */
102 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
103 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
104 StgTSO *ccalling_threadss[MAX_PROC];
105 /* We use the same global list of threads (all_threads) in GranSim as in
106    the std RTS (i.e. we are cheating). However, we don't use this list in
107    the GranSim specific code at the moment (so we are only potentially
108    cheating).  */
109
110 #else /* !GRAN */
111
112 #if !defined(THREADED_RTS)
113 // Blocked/sleeping thrads
114 StgTSO *blocked_queue_hd = NULL;
115 StgTSO *blocked_queue_tl = NULL;
116 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
117 #endif
118
119 /* Threads blocked on blackholes.
120  * LOCK: sched_mutex+capability, or all capabilities
121  */
122 StgTSO *blackhole_queue = NULL;
123 #endif
124
125 /* The blackhole_queue should be checked for threads to wake up.  See
126  * Schedule.h for more thorough comment.
127  * LOCK: none (doesn't matter if we miss an update)
128  */
129 rtsBool blackholes_need_checking = rtsFalse;
130
131 /* Linked list of all threads.
132  * Used for detecting garbage collected threads.
133  * LOCK: sched_mutex+capability, or all capabilities
134  */
135 StgTSO *all_threads = NULL;
136
137 /* flag set by signal handler to precipitate a context switch
138  * LOCK: none (just an advisory flag)
139  */
140 int context_switch = 0;
141
142 /* flag that tracks whether we have done any execution in this time slice.
143  * LOCK: currently none, perhaps we should lock (but needs to be
144  * updated in the fast path of the scheduler).
145  */
146 nat recent_activity = ACTIVITY_YES;
147
148 /* if this flag is set as well, give up execution
149  * LOCK: none (changes once, from false->true)
150  */
151 rtsBool interrupted = rtsFalse;
152
153 /* Next thread ID to allocate.
154  * LOCK: sched_mutex
155  */
156 static StgThreadID next_thread_id = 1;
157
158 /* The smallest stack size that makes any sense is:
159  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
160  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
161  *  + 1                       (the closure to enter)
162  *  + 1                       (stg_ap_v_ret)
163  *  + 1                       (spare slot req'd by stg_ap_v_ret)
164  *
165  * A thread with this stack will bomb immediately with a stack
166  * overflow, which will increase its stack size.  
167  */
168 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
169
170 #if defined(GRAN)
171 StgTSO *CurrentTSO;
172 #endif
173
174 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
175  *  exists - earlier gccs apparently didn't.
176  *  -= chak
177  */
178 StgTSO dummy_tso;
179
180 /*
181  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
182  * in an MT setting, needed to signal that a worker thread shouldn't hang around
183  * in the scheduler when it is out of work.
184  */
185 rtsBool shutting_down_scheduler = rtsFalse;
186
187 /*
188  * This mutex protects most of the global scheduler data in
189  * the THREADED_RTS and (inc. SMP) runtime.
190  */
191 #if defined(THREADED_RTS)
192 Mutex sched_mutex = INIT_MUTEX_VAR;
193 #endif
194
195 #if defined(PARALLEL_HASKELL)
196 StgTSO *LastTSO;
197 rtsTime TimeOfLastYield;
198 rtsBool emitSchedule = rtsTrue;
199 #endif
200
201 /* -----------------------------------------------------------------------------
202  * static function prototypes
203  * -------------------------------------------------------------------------- */
204
205 static Capability *schedule (Capability *initialCapability, Task *task);
206
207 //
208 // These function all encapsulate parts of the scheduler loop, and are
209 // abstracted only to make the structure and control flow of the
210 // scheduler clearer.
211 //
212 static void schedulePreLoop (void);
213 #if defined(SMP)
214 static void schedulePushWork(Capability *cap, Task *task);
215 #endif
216 static void scheduleStartSignalHandlers (void);
217 static void scheduleCheckBlockedThreads (Capability *cap);
218 static void scheduleCheckBlackHoles (Capability *cap);
219 static void scheduleDetectDeadlock (Capability *cap, Task *task);
220 #if defined(GRAN)
221 static StgTSO *scheduleProcessEvent(rtsEvent *event);
222 #endif
223 #if defined(PARALLEL_HASKELL)
224 static StgTSO *scheduleSendPendingMessages(void);
225 static void scheduleActivateSpark(void);
226 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
227 #endif
228 #if defined(PAR) || defined(GRAN)
229 static void scheduleGranParReport(void);
230 #endif
231 static void schedulePostRunThread(void);
232 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
233 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
234                                          StgTSO *t);
235 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
236                                     nat prev_what_next );
237 static void scheduleHandleThreadBlocked( StgTSO *t );
238 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
239                                              StgTSO *t );
240 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
241 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
242
243 static void unblockThread(Capability *cap, StgTSO *tso);
244 static rtsBool checkBlackHoles(Capability *cap);
245 static void AllRoots(evac_fn evac);
246
247 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
248
249 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
250                         rtsBool stop_at_atomically);
251
252 static void deleteThread (Capability *cap, StgTSO *tso);
253 static void deleteRunQueue (Capability *cap);
254
255 #ifdef DEBUG
256 static void printThreadBlockage(StgTSO *tso);
257 static void printThreadStatus(StgTSO *tso);
258 void printThreadQueue(StgTSO *tso);
259 #endif
260
261 #if defined(PARALLEL_HASKELL)
262 StgTSO * createSparkThread(rtsSpark spark);
263 StgTSO * activateSpark (rtsSpark spark);  
264 #endif
265
266 #ifdef DEBUG
267 static char *whatNext_strs[] = {
268   "(unknown)",
269   "ThreadRunGHC",
270   "ThreadInterpret",
271   "ThreadKilled",
272   "ThreadRelocated",
273   "ThreadComplete"
274 };
275 #endif
276
277 /* -----------------------------------------------------------------------------
278  * Putting a thread on the run queue: different scheduling policies
279  * -------------------------------------------------------------------------- */
280
281 STATIC_INLINE void
282 addToRunQueue( Capability *cap, StgTSO *t )
283 {
284 #if defined(PARALLEL_HASKELL)
285     if (RtsFlags.ParFlags.doFairScheduling) { 
286         // this does round-robin scheduling; good for concurrency
287         appendToRunQueue(cap,t);
288     } else {
289         // this does unfair scheduling; good for parallelism
290         pushOnRunQueue(cap,t);
291     }
292 #else
293     // this does round-robin scheduling; good for concurrency
294     appendToRunQueue(cap,t);
295 #endif
296 }
297
298 /* ---------------------------------------------------------------------------
299    Main scheduling loop.
300
301    We use round-robin scheduling, each thread returning to the
302    scheduler loop when one of these conditions is detected:
303
304       * out of heap space
305       * timer expires (thread yields)
306       * thread blocks
307       * thread ends
308       * stack overflow
309
310    GRAN version:
311      In a GranSim setup this loop iterates over the global event queue.
312      This revolves around the global event queue, which determines what 
313      to do next. Therefore, it's more complicated than either the 
314      concurrent or the parallel (GUM) setup.
315
316    GUM version:
317      GUM iterates over incoming messages.
318      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
319      and sends out a fish whenever it has nothing to do; in-between
320      doing the actual reductions (shared code below) it processes the
321      incoming messages and deals with delayed operations 
322      (see PendingFetches).
323      This is not the ugliest code you could imagine, but it's bloody close.
324
325    ------------------------------------------------------------------------ */
326
327 static Capability *
328 schedule (Capability *initialCapability, Task *task)
329 {
330   StgTSO *t;
331   Capability *cap;
332   StgThreadReturnCode ret;
333 #if defined(GRAN)
334   rtsEvent *event;
335 #elif defined(PARALLEL_HASKELL)
336   StgTSO *tso;
337   GlobalTaskId pe;
338   rtsBool receivedFinish = rtsFalse;
339 # if defined(DEBUG)
340   nat tp_size, sp_size; // stats only
341 # endif
342 #endif
343   nat prev_what_next;
344   rtsBool ready_to_gc;
345 #if defined(THREADED_RTS)
346   rtsBool first = rtsTrue;
347 #endif
348   
349   cap = initialCapability;
350
351   // Pre-condition: this task owns initialCapability.
352   // The sched_mutex is *NOT* held
353   // NB. on return, we still hold a capability.
354
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
357                        task, initialCapability);
358       );
359
360   schedulePreLoop();
361
362   // -----------------------------------------------------------
363   // Scheduler loop starts here:
364
365 #if defined(PARALLEL_HASKELL)
366 #define TERMINATION_CONDITION        (!receivedFinish)
367 #elif defined(GRAN)
368 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
369 #else
370 #define TERMINATION_CONDITION        rtsTrue
371 #endif
372
373   while (TERMINATION_CONDITION) {
374
375 #if defined(GRAN)
376       /* Choose the processor with the next event */
377       CurrentProc = event->proc;
378       CurrentTSO = event->tso;
379 #endif
380
381 #if defined(THREADED_RTS)
382       if (first) {
383           // don't yield the first time, we want a chance to run this
384           // thread for a bit, even if there are others banging at the
385           // door.
386           first = rtsFalse;
387           ASSERT_CAPABILITY_INVARIANTS(cap,task);
388       } else {
389           // Yield the capability to higher-priority tasks if necessary.
390           yieldCapability(&cap, task);
391       }
392 #endif
393       
394 #ifdef SMP
395       schedulePushWork(cap,task);
396 #endif          
397
398     // Check whether we have re-entered the RTS from Haskell without
399     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
400     // call).
401     if (cap->in_haskell) {
402           errorBelch("schedule: re-entered unsafely.\n"
403                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
404           stg_exit(EXIT_FAILURE);
405     }
406
407     //
408     // Test for interruption.  If interrupted==rtsTrue, then either
409     // we received a keyboard interrupt (^C), or the scheduler is
410     // trying to shut down all the tasks (shutting_down_scheduler) in
411     // the threaded RTS.
412     //
413     if (interrupted) {
414         deleteRunQueue(cap);
415         if (shutting_down_scheduler) {
416             IF_DEBUG(scheduler, sched_belch("shutting down"));
417             // If we are a worker, just exit.  If we're a bound thread
418             // then we will exit below when we've removed our TSO from
419             // the run queue.
420             if (task->tso == NULL) {
421                 return cap;
422             }
423         } else {
424             IF_DEBUG(scheduler, sched_belch("interrupted"));
425         }
426     }
427
428 #if defined(not_yet) && defined(SMP)
429     //
430     // Top up the run queue from our spark pool.  We try to make the
431     // number of threads in the run queue equal to the number of
432     // free capabilities.
433     //
434     {
435         StgClosure *spark;
436         if (emptyRunQueue()) {
437             spark = findSpark(rtsFalse);
438             if (spark == NULL) {
439                 break; /* no more sparks in the pool */
440             } else {
441                 createSparkThread(spark);         
442                 IF_DEBUG(scheduler,
443                          sched_belch("==^^ turning spark of closure %p into a thread",
444                                      (StgClosure *)spark));
445             }
446         }
447     }
448 #endif // SMP
449
450     scheduleStartSignalHandlers();
451
452     // Only check the black holes here if we've nothing else to do.
453     // During normal execution, the black hole list only gets checked
454     // at GC time, to avoid repeatedly traversing this possibly long
455     // list each time around the scheduler.
456     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
457
458     scheduleCheckBlockedThreads(cap);
459
460     scheduleDetectDeadlock(cap,task);
461
462     // Normally, the only way we can get here with no threads to
463     // run is if a keyboard interrupt received during 
464     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465     // Additionally, it is not fatal for the
466     // threaded RTS to reach here with no threads to run.
467     //
468     // win32: might be here due to awaitEvent() being abandoned
469     // as a result of a console event having been delivered.
470     if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
472         ASSERT(interrupted);
473 #endif
474         continue; // nothing to do
475     }
476
477 #if defined(PARALLEL_HASKELL)
478     scheduleSendPendingMessages();
479     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
480         continue;
481
482 #if defined(SPARKS)
483     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
484 #endif
485
486     /* If we still have no work we need to send a FISH to get a spark
487        from another PE */
488     if (emptyRunQueue(cap)) {
489         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490         ASSERT(rtsFalse); // should not happen at the moment
491     }
492     // from here: non-empty run queue.
493     //  TODO: merge above case with this, only one call processMessages() !
494     if (PacketsWaiting()) {  /* process incoming messages, if
495                                 any pending...  only in else
496                                 because getRemoteWork waits for
497                                 messages as well */
498         receivedFinish = processMessages();
499     }
500 #endif
501
502 #if defined(GRAN)
503     scheduleProcessEvent(event);
504 #endif
505
506     // 
507     // Get a thread to run
508     //
509     t = popRunQueue(cap);
510
511 #if defined(GRAN) || defined(PAR)
512     scheduleGranParReport(); // some kind of debuging output
513 #else
514     // Sanity check the thread we're about to run.  This can be
515     // expensive if there is lots of thread switching going on...
516     IF_DEBUG(sanity,checkTSO(t));
517 #endif
518
519 #if defined(THREADED_RTS)
520     // Check whether we can run this thread in the current task.
521     // If not, we have to pass our capability to the right task.
522     {
523         Task *bound = t->bound;
524       
525         if (bound) {
526             if (bound == task) {
527                 IF_DEBUG(scheduler,
528                          sched_belch("### Running thread %d in bound thread",
529                                      t->id));
530                 // yes, the Haskell thread is bound to the current native thread
531             } else {
532                 IF_DEBUG(scheduler,
533                          sched_belch("### thread %d bound to another OS thread",
534                                      t->id));
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 IF_DEBUG(scheduler,
543                          sched_belch("### this OS thread cannot run thread %d", t->id));
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]));
568
569 #if defined(PROFILING)
570     startHeapProfTimer();
571 #endif
572
573     // ----------------------------------------------------------------------
574     // Run the current thread 
575
576     prev_what_next = t->what_next;
577
578     errno = t->saved_errno;
579     cap->in_haskell = rtsTrue;
580
581     recent_activity = ACTIVITY_YES;
582
583     switch (prev_what_next) {
584         
585     case ThreadKilled:
586     case ThreadComplete:
587         /* Thread already finished, return to scheduler. */
588         ret = ThreadFinished;
589         break;
590         
591     case ThreadRunGHC:
592     {
593         StgRegTable *r;
594         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595         cap = regTableToCapability(r);
596         ret = r->rRet;
597         break;
598     }
599     
600     case ThreadInterpret:
601         cap = interpretBCO(cap);
602         ret = cap->r.rRet;
603         break;
604         
605     default:
606         barf("schedule: invalid what_next field");
607     }
608
609     cap->in_haskell = rtsFalse;
610
611     // The TSO might have moved, eg. if it re-entered the RTS and a GC
612     // happened.  So find the new location:
613     t = cap->r.rCurrentTSO;
614
615 #ifdef SMP
616     // If ret is ThreadBlocked, and this Task is bound to the TSO that
617     // blocked, we are in limbo - the TSO is now owned by whatever it
618     // is blocked on, and may in fact already have been woken up,
619     // perhaps even on a different Capability.  It may be the case
620     // that task->cap != cap.  We better yield this Capability
621     // immediately and return to normaility.
622     if (ret == ThreadBlocked) {
623         IF_DEBUG(scheduler,
624                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
625                             t->id, whatNext_strs[t->what_next]));
626         continue;
627     }
628 #endif
629
630     ASSERT_CAPABILITY_INVARIANTS(cap,task);
631
632     // And save the current errno in this thread.
633     t->saved_errno = errno;
634
635     // ----------------------------------------------------------------------
636     
637     // Costs for the scheduler are assigned to CCS_SYSTEM
638 #if defined(PROFILING)
639     stopHeapProfTimer();
640     CCCS = CCS_SYSTEM;
641 #endif
642     
643     // We have run some Haskell code: there might be blackhole-blocked
644     // threads to wake up now.
645     // Lock-free test here should be ok, we're just setting a flag.
646     if ( blackhole_queue != END_TSO_QUEUE ) {
647         blackholes_need_checking = rtsTrue;
648     }
649     
650 #if defined(THREADED_RTS)
651     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
652 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
653     IF_DEBUG(scheduler,debugBelch("sched: "););
654 #endif
655     
656     schedulePostRunThread();
657
658     ready_to_gc = rtsFalse;
659
660     switch (ret) {
661     case HeapOverflow:
662         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
663         break;
664
665     case StackOverflow:
666         scheduleHandleStackOverflow(cap,task,t);
667         break;
668
669     case ThreadYielding:
670         if (scheduleHandleYield(cap, t, prev_what_next)) {
671             // shortcut for switching between compiler/interpreter:
672             goto run_thread; 
673         }
674         break;
675
676     case ThreadBlocked:
677         scheduleHandleThreadBlocked(t);
678         break;
679
680     case ThreadFinished:
681         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
682         ASSERT_CAPABILITY_INVARIANTS(cap,task);
683         break;
684
685     default:
686       barf("schedule: invalid thread return code %d", (int)ret);
687     }
688
689     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
690     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
691   } /* end of while() */
692
693   IF_PAR_DEBUG(verbose,
694                debugBelch("== Leaving schedule() after having received Finish\n"));
695 }
696
697 /* ----------------------------------------------------------------------------
698  * Setting up the scheduler loop
699  * ------------------------------------------------------------------------- */
700
701 static void
702 schedulePreLoop(void)
703 {
704 #if defined(GRAN) 
705     /* set up first event to get things going */
706     /* ToDo: assign costs for system setup and init MainTSO ! */
707     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
708               ContinueThread, 
709               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
710     
711     IF_DEBUG(gran,
712              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
713                         CurrentTSO);
714              G_TSO(CurrentTSO, 5));
715     
716     if (RtsFlags.GranFlags.Light) {
717         /* Save current time; GranSim Light only */
718         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
719     }      
720 #endif
721 }
722
723 /* -----------------------------------------------------------------------------
724  * schedulePushWork()
725  *
726  * Push work to other Capabilities if we have some.
727  * -------------------------------------------------------------------------- */
728
729 #ifdef SMP
730 static void
731 schedulePushWork(Capability *cap USED_WHEN_SMP, 
732                  Task *task      USED_WHEN_SMP)
733 {
734     Capability *free_caps[n_capabilities], *cap0;
735     nat i, n_free_caps;
736
737     // Check whether we have more threads on our run queue that we
738     // could hand to another Capability.
739     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
740         return;
741     }
742
743     // First grab as many free Capabilities as we can.
744     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
745         cap0 = &capabilities[i];
746         if (cap != cap0 && tryGrabCapability(cap0,task)) {
747             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
748                 // it already has some work, we just grabbed it at 
749                 // the wrong moment.  Or maybe it's deadlocked!
750                 releaseCapability(cap0);
751             } else {
752                 free_caps[n_free_caps++] = cap0;
753             }
754         }
755     }
756
757     // we now have n_free_caps free capabilities stashed in
758     // free_caps[].  Share our run queue equally with them.  This is
759     // probably the simplest thing we could do; improvements we might
760     // want to do include:
761     //
762     //   - giving high priority to moving relatively new threads, on 
763     //     the gournds that they haven't had time to build up a
764     //     working set in the cache on this CPU/Capability.
765     //
766     //   - giving low priority to moving long-lived threads
767
768     if (n_free_caps > 0) {
769         StgTSO *prev, *t, *next;
770         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
771
772         prev = cap->run_queue_hd;
773         t = prev->link;
774         prev->link = END_TSO_QUEUE;
775         i = 0;
776         for (; t != END_TSO_QUEUE; t = next) {
777             next = t->link;
778             t->link = END_TSO_QUEUE;
779             if (t->what_next == ThreadRelocated) {
780                 prev->link = t;
781                 prev = t;
782             } else if (i == n_free_caps) {
783                 i = 0;
784                 // keep one for us
785                 prev->link = t;
786                 prev = t;
787             } else {
788                 appendToRunQueue(free_caps[i],t);
789                 if (t->bound) { t->bound->cap = free_caps[i]; }
790                 i++;
791             }
792         }
793         cap->run_queue_tl = prev;
794
795         // release the capabilities
796         for (i = 0; i < n_free_caps; i++) {
797             task->cap = free_caps[i];
798             releaseCapability(free_caps[i]);
799         }
800     }
801     task->cap = cap; // reset to point to our Capability.
802 }
803 #endif
804
805 /* ----------------------------------------------------------------------------
806  * Start any pending signal handlers
807  * ------------------------------------------------------------------------- */
808
809 static void
810 scheduleStartSignalHandlers(void)
811 {
812 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
813     if (signals_pending()) { // safe outside the lock
814         startSignalHandlers();
815     }
816 #endif
817 }
818
819 /* ----------------------------------------------------------------------------
820  * Check for blocked threads that can be woken up.
821  * ------------------------------------------------------------------------- */
822
823 static void
824 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
825 {
826 #if !defined(THREADED_RTS)
827     //
828     // Check whether any waiting threads need to be woken up.  If the
829     // run queue is empty, and there are no other tasks running, we
830     // can wait indefinitely for something to happen.
831     //
832     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
833     {
834         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
835     }
836 #endif
837 }
838
839
840 /* ----------------------------------------------------------------------------
841  * Check for threads blocked on BLACKHOLEs that can be woken up
842  * ------------------------------------------------------------------------- */
843 static void
844 scheduleCheckBlackHoles (Capability *cap)
845 {
846     if ( blackholes_need_checking ) // check without the lock first
847     {
848         ACQUIRE_LOCK(&sched_mutex);
849         if ( blackholes_need_checking ) {
850             checkBlackHoles(cap);
851             blackholes_need_checking = rtsFalse;
852         }
853         RELEASE_LOCK(&sched_mutex);
854     }
855 }
856
857 /* ----------------------------------------------------------------------------
858  * Detect deadlock conditions and attempt to resolve them.
859  * ------------------------------------------------------------------------- */
860
861 static void
862 scheduleDetectDeadlock (Capability *cap, Task *task)
863 {
864
865 #if defined(PARALLEL_HASKELL)
866     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
867     return;
868 #endif
869
870     /* 
871      * Detect deadlock: when we have no threads to run, there are no
872      * threads blocked, waiting for I/O, or sleeping, and all the
873      * other tasks are waiting for work, we must have a deadlock of
874      * some description.
875      */
876     if ( emptyThreadQueues(cap) )
877     {
878 #if defined(THREADED_RTS)
879         /* 
880          * In the threaded RTS, we only check for deadlock if there
881          * has been no activity in a complete timeslice.  This means
882          * we won't eagerly start a full GC just because we don't have
883          * any threads to run currently.
884          */
885         if (recent_activity != ACTIVITY_INACTIVE) return;
886 #endif
887
888         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
889
890         // Garbage collection can release some new threads due to
891         // either (a) finalizers or (b) threads resurrected because
892         // they are unreachable and will therefore be sent an
893         // exception.  Any threads thus released will be immediately
894         // runnable.
895         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
896         recent_activity = ACTIVITY_DONE_GC;
897         
898         if ( !emptyRunQueue(cap) ) return;
899
900 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
901         /* If we have user-installed signal handlers, then wait
902          * for signals to arrive rather then bombing out with a
903          * deadlock.
904          */
905         if ( anyUserHandlers() ) {
906             IF_DEBUG(scheduler, 
907                      sched_belch("still deadlocked, waiting for signals..."));
908
909             awaitUserSignals();
910
911             if (signals_pending()) {
912                 startSignalHandlers();
913             }
914
915             // either we have threads to run, or we were interrupted:
916             ASSERT(!emptyRunQueue(cap) || interrupted);
917         }
918 #endif
919
920 #if !defined(THREADED_RTS)
921         /* Probably a real deadlock.  Send the current main thread the
922          * Deadlock exception.
923          */
924         if (task->tso) {
925             switch (task->tso->why_blocked) {
926             case BlockedOnSTM:
927             case BlockedOnBlackHole:
928             case BlockedOnException:
929             case BlockedOnMVar:
930                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
931                 return;
932             default:
933                 barf("deadlock: main thread blocked in a strange way");
934             }
935         }
936         return;
937 #endif
938     }
939 }
940
941 /* ----------------------------------------------------------------------------
942  * Process an event (GRAN only)
943  * ------------------------------------------------------------------------- */
944
945 #if defined(GRAN)
946 static StgTSO *
947 scheduleProcessEvent(rtsEvent *event)
948 {
949     StgTSO *t;
950
951     if (RtsFlags.GranFlags.Light)
952       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
953
954     /* adjust time based on time-stamp */
955     if (event->time > CurrentTime[CurrentProc] &&
956         event->evttype != ContinueThread)
957       CurrentTime[CurrentProc] = event->time;
958     
959     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
960     if (!RtsFlags.GranFlags.Light)
961       handleIdlePEs();
962
963     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
964
965     /* main event dispatcher in GranSim */
966     switch (event->evttype) {
967       /* Should just be continuing execution */
968     case ContinueThread:
969       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
970       /* ToDo: check assertion
971       ASSERT(run_queue_hd != (StgTSO*)NULL &&
972              run_queue_hd != END_TSO_QUEUE);
973       */
974       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
975       if (!RtsFlags.GranFlags.DoAsyncFetch &&
976           procStatus[CurrentProc]==Fetching) {
977         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
978               CurrentTSO->id, CurrentTSO, CurrentProc);
979         goto next_thread;
980       } 
981       /* Ignore ContinueThreads for completed threads */
982       if (CurrentTSO->what_next == ThreadComplete) {
983         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
984               CurrentTSO->id, CurrentTSO, CurrentProc);
985         goto next_thread;
986       } 
987       /* Ignore ContinueThreads for threads that are being migrated */
988       if (PROCS(CurrentTSO)==Nowhere) { 
989         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
990               CurrentTSO->id, CurrentTSO, CurrentProc);
991         goto next_thread;
992       }
993       /* The thread should be at the beginning of the run queue */
994       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
995         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
996               CurrentTSO->id, CurrentTSO, CurrentProc);
997         break; // run the thread anyway
998       }
999       /*
1000       new_event(proc, proc, CurrentTime[proc],
1001                 FindWork,
1002                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1003       goto next_thread; 
1004       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1005       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1006
1007     case FetchNode:
1008       do_the_fetchnode(event);
1009       goto next_thread;             /* handle next event in event queue  */
1010       
1011     case GlobalBlock:
1012       do_the_globalblock(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case FetchReply:
1016       do_the_fetchreply(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case UnblockThread:   /* Move from the blocked queue to the tail of */
1020       do_the_unblock(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case ResumeThread:  /* Move from the blocked queue to the tail of */
1024       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1025       event->tso->gran.blocktime += 
1026         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1027       do_the_startthread(event);
1028       goto next_thread;             /* handle next event in event queue  */
1029       
1030     case StartThread:
1031       do_the_startthread(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case MoveThread:
1035       do_the_movethread(event);
1036       goto next_thread;             /* handle next event in event queue  */
1037       
1038     case MoveSpark:
1039       do_the_movespark(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case FindWork:
1043       do_the_findwork(event);
1044       goto next_thread;             /* handle next event in event queue  */
1045       
1046     default:
1047       barf("Illegal event type %u\n", event->evttype);
1048     }  /* switch */
1049     
1050     /* This point was scheduler_loop in the old RTS */
1051
1052     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1053
1054     TimeOfLastEvent = CurrentTime[CurrentProc];
1055     TimeOfNextEvent = get_time_of_next_event();
1056     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1057     // CurrentTSO = ThreadQueueHd;
1058
1059     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1060                          TimeOfNextEvent));
1061
1062     if (RtsFlags.GranFlags.Light) 
1063       GranSimLight_leave_system(event, &ActiveTSO); 
1064
1065     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1066
1067     IF_DEBUG(gran, 
1068              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1069
1070     /* in a GranSim setup the TSO stays on the run queue */
1071     t = CurrentTSO;
1072     /* Take a thread from the run queue. */
1073     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1074
1075     IF_DEBUG(gran, 
1076              debugBelch("GRAN: About to run current thread, which is\n");
1077              G_TSO(t,5));
1078
1079     context_switch = 0; // turned on via GranYield, checking events and time slice
1080
1081     IF_DEBUG(gran, 
1082              DumpGranEvent(GR_SCHEDULE, t));
1083
1084     procStatus[CurrentProc] = Busy;
1085 }
1086 #endif // GRAN
1087
1088 /* ----------------------------------------------------------------------------
1089  * Send pending messages (PARALLEL_HASKELL only)
1090  * ------------------------------------------------------------------------- */
1091
1092 #if defined(PARALLEL_HASKELL)
1093 static StgTSO *
1094 scheduleSendPendingMessages(void)
1095 {
1096     StgSparkPool *pool;
1097     rtsSpark spark;
1098     StgTSO *t;
1099
1100 # if defined(PAR) // global Mem.Mgmt., omit for now
1101     if (PendingFetches != END_BF_QUEUE) {
1102         processFetches();
1103     }
1104 # endif
1105     
1106     if (RtsFlags.ParFlags.BufferTime) {
1107         // if we use message buffering, we must send away all message
1108         // packets which have become too old...
1109         sendOldBuffers(); 
1110     }
1111 }
1112 #endif
1113
1114 /* ----------------------------------------------------------------------------
1115  * Activate spark threads (PARALLEL_HASKELL only)
1116  * ------------------------------------------------------------------------- */
1117
1118 #if defined(PARALLEL_HASKELL)
1119 static void
1120 scheduleActivateSpark(void)
1121 {
1122 #if defined(SPARKS)
1123   ASSERT(emptyRunQueue());
1124 /* We get here if the run queue is empty and want some work.
1125    We try to turn a spark into a thread, and add it to the run queue,
1126    from where it will be picked up in the next iteration of the scheduler
1127    loop.
1128 */
1129
1130       /* :-[  no local threads => look out for local sparks */
1131       /* the spark pool for the current PE */
1132       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1133       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1134           pool->hd < pool->tl) {
1135         /* 
1136          * ToDo: add GC code check that we really have enough heap afterwards!!
1137          * Old comment:
1138          * If we're here (no runnable threads) and we have pending
1139          * sparks, we must have a space problem.  Get enough space
1140          * to turn one of those pending sparks into a
1141          * thread... 
1142          */
1143
1144         spark = findSpark(rtsFalse);            /* get a spark */
1145         if (spark != (rtsSpark) NULL) {
1146           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1147           IF_PAR_DEBUG(fish, // schedule,
1148                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1149                              tso->id, tso, advisory_thread_count));
1150
1151           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1152             IF_PAR_DEBUG(fish, // schedule,
1153                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1154                             spark));
1155             return rtsFalse; /* failed to generate a thread */
1156           }                  /* otherwise fall through & pick-up new tso */
1157         } else {
1158           IF_PAR_DEBUG(fish, // schedule,
1159                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1160                              spark_queue_len(pool)));
1161           return rtsFalse;  /* failed to generate a thread */
1162         }
1163         return rtsTrue;  /* success in generating a thread */
1164   } else { /* no more threads permitted or pool empty */
1165     return rtsFalse;  /* failed to generateThread */
1166   }
1167 #else
1168   tso = NULL; // avoid compiler warning only
1169   return rtsFalse;  /* dummy in non-PAR setup */
1170 #endif // SPARKS
1171 }
1172 #endif // PARALLEL_HASKELL
1173
1174 /* ----------------------------------------------------------------------------
1175  * Get work from a remote node (PARALLEL_HASKELL only)
1176  * ------------------------------------------------------------------------- */
1177     
1178 #if defined(PARALLEL_HASKELL)
1179 static rtsBool
1180 scheduleGetRemoteWork(rtsBool *receivedFinish)
1181 {
1182   ASSERT(emptyRunQueue());
1183
1184   if (RtsFlags.ParFlags.BufferTime) {
1185         IF_PAR_DEBUG(verbose, 
1186                 debugBelch("...send all pending data,"));
1187         {
1188           nat i;
1189           for (i=1; i<=nPEs; i++)
1190             sendImmediately(i); // send all messages away immediately
1191         }
1192   }
1193 # ifndef SPARKS
1194         //++EDEN++ idle() , i.e. send all buffers, wait for work
1195         // suppress fishing in EDEN... just look for incoming messages
1196         // (blocking receive)
1197   IF_PAR_DEBUG(verbose, 
1198                debugBelch("...wait for incoming messages...\n"));
1199   *receivedFinish = processMessages(); // blocking receive...
1200
1201         // and reenter scheduling loop after having received something
1202         // (return rtsFalse below)
1203
1204 # else /* activate SPARKS machinery */
1205 /* We get here, if we have no work, tried to activate a local spark, but still
1206    have no work. We try to get a remote spark, by sending a FISH message.
1207    Thread migration should be added here, and triggered when a sequence of 
1208    fishes returns without work. */
1209         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1210
1211       /* =8-[  no local sparks => look for work on other PEs */
1212         /*
1213          * We really have absolutely no work.  Send out a fish
1214          * (there may be some out there already), and wait for
1215          * something to arrive.  We clearly can't run any threads
1216          * until a SCHEDULE or RESUME arrives, and so that's what
1217          * we're hoping to see.  (Of course, we still have to
1218          * respond to other types of messages.)
1219          */
1220         rtsTime now = msTime() /*CURRENT_TIME*/;
1221         IF_PAR_DEBUG(verbose, 
1222                      debugBelch("--  now=%ld\n", now));
1223         IF_PAR_DEBUG(fish, // verbose,
1224              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1225                  (last_fish_arrived_at!=0 &&
1226                   last_fish_arrived_at+delay > now)) {
1227                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1228                      now, last_fish_arrived_at+delay, 
1229                      last_fish_arrived_at,
1230                      delay);
1231              });
1232   
1233         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1234             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1235           if (last_fish_arrived_at==0 ||
1236               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1237             /* outstandingFishes is set in sendFish, processFish;
1238                avoid flooding system with fishes via delay */
1239     next_fish_to_send_at = 0;  
1240   } else {
1241     /* ToDo: this should be done in the main scheduling loop to avoid the
1242              busy wait here; not so bad if fish delay is very small  */
1243     int iq = 0; // DEBUGGING -- HWL
1244     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1245     /* send a fish when ready, but process messages that arrive in the meantime */
1246     do {
1247       if (PacketsWaiting()) {
1248         iq++; // DEBUGGING
1249         *receivedFinish = processMessages();
1250       }
1251       now = msTime();
1252     } while (!*receivedFinish || now<next_fish_to_send_at);
1253     // JB: This means the fish could become obsolete, if we receive
1254     // work. Better check for work again? 
1255     // last line: while (!receivedFinish || !haveWork || now<...)
1256     // next line: if (receivedFinish || haveWork )
1257
1258     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1259       return rtsFalse;  // NB: this will leave scheduler loop
1260                         // immediately after return!
1261                           
1262     IF_PAR_DEBUG(fish, // verbose,
1263                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1264
1265   }
1266
1267     // JB: IMHO, this should all be hidden inside sendFish(...)
1268     /* pe = choosePE(); 
1269        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1270                 NEW_FISH_HUNGER);
1271
1272     // Global statistics: count no. of fishes
1273     if (RtsFlags.ParFlags.ParStats.Global &&
1274          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1275            globalParStats.tot_fish_mess++;
1276            }
1277     */ 
1278
1279   /* delayed fishes must have been sent by now! */
1280   next_fish_to_send_at = 0;  
1281   }
1282       
1283   *receivedFinish = processMessages();
1284 # endif /* SPARKS */
1285
1286  return rtsFalse;
1287  /* NB: this function always returns rtsFalse, meaning the scheduler
1288     loop continues with the next iteration; 
1289     rationale: 
1290       return code means success in finding work; we enter this function
1291       if there is no local work, thus have to send a fish which takes
1292       time until it arrives with work; in the meantime we should process
1293       messages in the main loop;
1294  */
1295 }
1296 #endif // PARALLEL_HASKELL
1297
1298 /* ----------------------------------------------------------------------------
1299  * PAR/GRAN: Report stats & debugging info(?)
1300  * ------------------------------------------------------------------------- */
1301
1302 #if defined(PAR) || defined(GRAN)
1303 static void
1304 scheduleGranParReport(void)
1305 {
1306   ASSERT(run_queue_hd != END_TSO_QUEUE);
1307
1308   /* Take a thread from the run queue, if we have work */
1309   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1310
1311     /* If this TSO has got its outport closed in the meantime, 
1312      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1313      * It has to be marked as TH_DEAD for this purpose.
1314      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1315
1316 JB: TODO: investigate wether state change field could be nuked
1317      entirely and replaced by the normal tso state (whatnext
1318      field). All we want to do is to kill tsos from outside.
1319      */
1320
1321     /* ToDo: write something to the log-file
1322     if (RTSflags.ParFlags.granSimStats && !sameThread)
1323         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1324
1325     CurrentTSO = t;
1326     */
1327     /* the spark pool for the current PE */
1328     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1329
1330     IF_DEBUG(scheduler, 
1331              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1332                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1333
1334     IF_PAR_DEBUG(fish,
1335              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1336                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1337
1338     if (RtsFlags.ParFlags.ParStats.Full && 
1339         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1340         (emitSchedule || // forced emit
1341          (t && LastTSO && t->id != LastTSO->id))) {
1342       /* 
1343          we are running a different TSO, so write a schedule event to log file
1344          NB: If we use fair scheduling we also have to write  a deschedule 
1345              event for LastTSO; with unfair scheduling we know that the
1346              previous tso has blocked whenever we switch to another tso, so
1347              we don't need it in GUM for now
1348       */
1349       IF_PAR_DEBUG(fish, // schedule,
1350                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1351
1352       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1353                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1354       emitSchedule = rtsFalse;
1355     }
1356 }     
1357 #endif
1358
1359 /* ----------------------------------------------------------------------------
1360  * After running a thread...
1361  * ------------------------------------------------------------------------- */
1362
1363 static void
1364 schedulePostRunThread(void)
1365 {
1366 #if defined(PAR)
1367     /* HACK 675: if the last thread didn't yield, make sure to print a 
1368        SCHEDULE event to the log file when StgRunning the next thread, even
1369        if it is the same one as before */
1370     LastTSO = t; 
1371     TimeOfLastYield = CURRENT_TIME;
1372 #endif
1373
1374   /* some statistics gathering in the parallel case */
1375
1376 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1377   switch (ret) {
1378     case HeapOverflow:
1379 # if defined(GRAN)
1380       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1381       globalGranStats.tot_heapover++;
1382 # elif defined(PAR)
1383       globalParStats.tot_heapover++;
1384 # endif
1385       break;
1386
1387      case StackOverflow:
1388 # if defined(GRAN)
1389       IF_DEBUG(gran, 
1390                DumpGranEvent(GR_DESCHEDULE, t));
1391       globalGranStats.tot_stackover++;
1392 # elif defined(PAR)
1393       // IF_DEBUG(par, 
1394       // DumpGranEvent(GR_DESCHEDULE, t);
1395       globalParStats.tot_stackover++;
1396 # endif
1397       break;
1398
1399     case ThreadYielding:
1400 # if defined(GRAN)
1401       IF_DEBUG(gran, 
1402                DumpGranEvent(GR_DESCHEDULE, t));
1403       globalGranStats.tot_yields++;
1404 # elif defined(PAR)
1405       // IF_DEBUG(par, 
1406       // DumpGranEvent(GR_DESCHEDULE, t);
1407       globalParStats.tot_yields++;
1408 # endif
1409       break; 
1410
1411     case ThreadBlocked:
1412 # if defined(GRAN)
1413       IF_DEBUG(scheduler,
1414                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1415                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1416                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1417                if (t->block_info.closure!=(StgClosure*)NULL)
1418                  print_bq(t->block_info.closure);
1419                debugBelch("\n"));
1420
1421       // ??? needed; should emit block before
1422       IF_DEBUG(gran, 
1423                DumpGranEvent(GR_DESCHEDULE, t)); 
1424       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1425       /*
1426         ngoq Dogh!
1427       ASSERT(procStatus[CurrentProc]==Busy || 
1428               ((procStatus[CurrentProc]==Fetching) && 
1429               (t->block_info.closure!=(StgClosure*)NULL)));
1430       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1431           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1432             procStatus[CurrentProc]==Fetching)) 
1433         procStatus[CurrentProc] = Idle;
1434       */
1435 # elif defined(PAR)
1436 //++PAR++  blockThread() writes the event (change?)
1437 # endif
1438     break;
1439
1440   case ThreadFinished:
1441     break;
1442
1443   default:
1444     barf("parGlobalStats: unknown return code");
1445     break;
1446     }
1447 #endif
1448 }
1449
1450 /* -----------------------------------------------------------------------------
1451  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1452  * -------------------------------------------------------------------------- */
1453
1454 static rtsBool
1455 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1456 {
1457     // did the task ask for a large block?
1458     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1459         // if so, get one and push it on the front of the nursery.
1460         bdescr *bd;
1461         lnat blocks;
1462         
1463         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1464         
1465         IF_DEBUG(scheduler,
1466                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1467                             (long)t->id, whatNext_strs[t->what_next], blocks));
1468         
1469         // don't do this if the nursery is (nearly) full, we'll GC first.
1470         if (cap->r.rCurrentNursery->link != NULL ||
1471             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1472                                                // if the nursery has only one block.
1473             
1474             ACQUIRE_SM_LOCK
1475             bd = allocGroup( blocks );
1476             RELEASE_SM_LOCK
1477             cap->r.rNursery->n_blocks += blocks;
1478             
1479             // link the new group into the list
1480             bd->link = cap->r.rCurrentNursery;
1481             bd->u.back = cap->r.rCurrentNursery->u.back;
1482             if (cap->r.rCurrentNursery->u.back != NULL) {
1483                 cap->r.rCurrentNursery->u.back->link = bd;
1484             } else {
1485 #if !defined(SMP)
1486                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1487                        g0s0 == cap->r.rNursery);
1488 #endif
1489                 cap->r.rNursery->blocks = bd;
1490             }             
1491             cap->r.rCurrentNursery->u.back = bd;
1492             
1493             // initialise it as a nursery block.  We initialise the
1494             // step, gen_no, and flags field of *every* sub-block in
1495             // this large block, because this is easier than making
1496             // sure that we always find the block head of a large
1497             // block whenever we call Bdescr() (eg. evacuate() and
1498             // isAlive() in the GC would both have to do this, at
1499             // least).
1500             { 
1501                 bdescr *x;
1502                 for (x = bd; x < bd + blocks; x++) {
1503                     x->step = cap->r.rNursery;
1504                     x->gen_no = 0;
1505                     x->flags = 0;
1506                 }
1507             }
1508             
1509             // This assert can be a killer if the app is doing lots
1510             // of large block allocations.
1511             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1512             
1513             // now update the nursery to point to the new block
1514             cap->r.rCurrentNursery = bd;
1515             
1516             // we might be unlucky and have another thread get on the
1517             // run queue before us and steal the large block, but in that
1518             // case the thread will just end up requesting another large
1519             // block.
1520             pushOnRunQueue(cap,t);
1521             return rtsFalse;  /* not actually GC'ing */
1522         }
1523     }
1524     
1525     IF_DEBUG(scheduler,
1526              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1527                         (long)t->id, whatNext_strs[t->what_next]));
1528 #if defined(GRAN)
1529     ASSERT(!is_on_queue(t,CurrentProc));
1530 #elif defined(PARALLEL_HASKELL)
1531     /* Currently we emit a DESCHEDULE event before GC in GUM.
1532        ToDo: either add separate event to distinguish SYSTEM time from rest
1533        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1534     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1535         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1536                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1537         emitSchedule = rtsTrue;
1538     }
1539 #endif
1540       
1541     pushOnRunQueue(cap,t);
1542     return rtsTrue;
1543     /* actual GC is done at the end of the while loop in schedule() */
1544 }
1545
1546 /* -----------------------------------------------------------------------------
1547  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1548  * -------------------------------------------------------------------------- */
1549
1550 static void
1551 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1552 {
1553     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1554                                   (long)t->id, whatNext_strs[t->what_next]));
1555     /* just adjust the stack for this thread, then pop it back
1556      * on the run queue.
1557      */
1558     { 
1559         /* enlarge the stack */
1560         StgTSO *new_t = threadStackOverflow(cap, t);
1561         
1562         /* The TSO attached to this Task may have moved, so update the
1563          * pointer to it.
1564          */
1565         if (task->tso == t) {
1566             task->tso = new_t;
1567         }
1568         pushOnRunQueue(cap,new_t);
1569     }
1570 }
1571
1572 /* -----------------------------------------------------------------------------
1573  * Handle a thread that returned to the scheduler with ThreadYielding
1574  * -------------------------------------------------------------------------- */
1575
1576 static rtsBool
1577 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1578 {
1579     // Reset the context switch flag.  We don't do this just before
1580     // running the thread, because that would mean we would lose ticks
1581     // during GC, which can lead to unfair scheduling (a thread hogs
1582     // the CPU because the tick always arrives during GC).  This way
1583     // penalises threads that do a lot of allocation, but that seems
1584     // better than the alternative.
1585     context_switch = 0;
1586     
1587     /* put the thread back on the run queue.  Then, if we're ready to
1588      * GC, check whether this is the last task to stop.  If so, wake
1589      * up the GC thread.  getThread will block during a GC until the
1590      * GC is finished.
1591      */
1592     IF_DEBUG(scheduler,
1593              if (t->what_next != prev_what_next) {
1594                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1595                             (long)t->id, whatNext_strs[t->what_next]);
1596              } else {
1597                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1598                             (long)t->id, whatNext_strs[t->what_next]);
1599              }
1600         );
1601     
1602     IF_DEBUG(sanity,
1603              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1604              checkTSO(t));
1605     ASSERT(t->link == END_TSO_QUEUE);
1606     
1607     // Shortcut if we're just switching evaluators: don't bother
1608     // doing stack squeezing (which can be expensive), just run the
1609     // thread.
1610     if (t->what_next != prev_what_next) {
1611         return rtsTrue;
1612     }
1613     
1614 #if defined(GRAN)
1615     ASSERT(!is_on_queue(t,CurrentProc));
1616       
1617     IF_DEBUG(sanity,
1618              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1619              checkThreadQsSanity(rtsTrue));
1620
1621 #endif
1622
1623     addToRunQueue(cap,t);
1624
1625 #if defined(GRAN)
1626     /* add a ContinueThread event to actually process the thread */
1627     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1628               ContinueThread,
1629               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1630     IF_GRAN_DEBUG(bq, 
1631                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1632                   G_EVENTQ(0);
1633                   G_CURR_THREADQ(0));
1634 #endif
1635     return rtsFalse;
1636 }
1637
1638 /* -----------------------------------------------------------------------------
1639  * Handle a thread that returned to the scheduler with ThreadBlocked
1640  * -------------------------------------------------------------------------- */
1641
1642 static void
1643 scheduleHandleThreadBlocked( StgTSO *t
1644 #if !defined(GRAN) && !defined(DEBUG)
1645     STG_UNUSED
1646 #endif
1647     )
1648 {
1649 #if defined(GRAN)
1650     IF_DEBUG(scheduler,
1651              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1652                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1653              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1654     
1655     // ??? needed; should emit block before
1656     IF_DEBUG(gran, 
1657              DumpGranEvent(GR_DESCHEDULE, t)); 
1658     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1659     /*
1660       ngoq Dogh!
1661       ASSERT(procStatus[CurrentProc]==Busy || 
1662       ((procStatus[CurrentProc]==Fetching) && 
1663       (t->block_info.closure!=(StgClosure*)NULL)));
1664       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1665       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1666       procStatus[CurrentProc]==Fetching)) 
1667       procStatus[CurrentProc] = Idle;
1668     */
1669 #elif defined(PAR)
1670     IF_DEBUG(scheduler,
1671              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1672                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1673     IF_PAR_DEBUG(bq,
1674                  
1675                  if (t->block_info.closure!=(StgClosure*)NULL) 
1676                  print_bq(t->block_info.closure));
1677     
1678     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1679     blockThread(t);
1680     
1681     /* whatever we schedule next, we must log that schedule */
1682     emitSchedule = rtsTrue;
1683     
1684 #else /* !GRAN */
1685
1686       // We don't need to do anything.  The thread is blocked, and it
1687       // has tidied up its stack and placed itself on whatever queue
1688       // it needs to be on.
1689
1690 #if !defined(SMP)
1691     ASSERT(t->why_blocked != NotBlocked);
1692              // This might not be true under SMP: we don't have
1693              // exclusive access to this TSO, so someone might have
1694              // woken it up by now.  This actually happens: try
1695              // conc023 +RTS -N2.
1696 #endif
1697
1698     IF_DEBUG(scheduler,
1699              debugBelch("--<< thread %d (%s) stopped: ", 
1700                         t->id, whatNext_strs[t->what_next]);
1701              printThreadBlockage(t);
1702              debugBelch("\n"));
1703     
1704     /* Only for dumping event to log file 
1705        ToDo: do I need this in GranSim, too?
1706        blockThread(t);
1707     */
1708 #endif
1709 }
1710
1711 /* -----------------------------------------------------------------------------
1712  * Handle a thread that returned to the scheduler with ThreadFinished
1713  * -------------------------------------------------------------------------- */
1714
1715 static rtsBool
1716 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1717 {
1718     /* Need to check whether this was a main thread, and if so,
1719      * return with the return value.
1720      *
1721      * We also end up here if the thread kills itself with an
1722      * uncaught exception, see Exception.cmm.
1723      */
1724     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1725                                   t->id, whatNext_strs[t->what_next]));
1726
1727 #if defined(GRAN)
1728       endThread(t, CurrentProc); // clean-up the thread
1729 #elif defined(PARALLEL_HASKELL)
1730       /* For now all are advisory -- HWL */
1731       //if(t->priority==AdvisoryPriority) ??
1732       advisory_thread_count--; // JB: Caution with this counter, buggy!
1733       
1734 # if defined(DIST)
1735       if(t->dist.priority==RevalPriority)
1736         FinishReval(t);
1737 # endif
1738     
1739 # if defined(EDENOLD)
1740       // the thread could still have an outport... (BUG)
1741       if (t->eden.outport != -1) {
1742       // delete the outport for the tso which has finished...
1743         IF_PAR_DEBUG(eden_ports,
1744                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1745                               t->eden.outport, t->id));
1746         deleteOPT(t);
1747       }
1748       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1749       if (t->eden.epid != -1) {
1750         IF_PAR_DEBUG(eden_ports,
1751                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1752                            t->id, t->eden.epid));
1753         removeTSOfromProcess(t);
1754       }
1755 # endif 
1756
1757 # if defined(PAR)
1758       if (RtsFlags.ParFlags.ParStats.Full &&
1759           !RtsFlags.ParFlags.ParStats.Suppressed) 
1760         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1761
1762       //  t->par only contains statistics: left out for now...
1763       IF_PAR_DEBUG(fish,
1764                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1765                               t->id,t,t->par.sparkname));
1766 # endif
1767 #endif // PARALLEL_HASKELL
1768
1769       //
1770       // Check whether the thread that just completed was a bound
1771       // thread, and if so return with the result.  
1772       //
1773       // There is an assumption here that all thread completion goes
1774       // through this point; we need to make sure that if a thread
1775       // ends up in the ThreadKilled state, that it stays on the run
1776       // queue so it can be dealt with here.
1777       //
1778
1779       if (t->bound) {
1780
1781           if (t->bound != task) {
1782 #if !defined(THREADED_RTS)
1783               // Must be a bound thread that is not the topmost one.  Leave
1784               // it on the run queue until the stack has unwound to the
1785               // point where we can deal with this.  Leaving it on the run
1786               // queue also ensures that the garbage collector knows about
1787               // this thread and its return value (it gets dropped from the
1788               // all_threads list so there's no other way to find it).
1789               appendToRunQueue(cap,t);
1790               return rtsFalse;
1791 #else
1792               // this cannot happen in the threaded RTS, because a
1793               // bound thread can only be run by the appropriate Task.
1794               barf("finished bound thread that isn't mine");
1795 #endif
1796           }
1797
1798           ASSERT(task->tso == t);
1799
1800           if (t->what_next == ThreadComplete) {
1801               if (task->ret) {
1802                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1803                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1804               }
1805               task->stat = Success;
1806           } else {
1807               if (task->ret) {
1808                   *(task->ret) = NULL;
1809               }
1810               if (interrupted) {
1811                   task->stat = Interrupted;
1812               } else {
1813                   task->stat = Killed;
1814               }
1815           }
1816 #ifdef DEBUG
1817           removeThreadLabel((StgWord)task->tso->id);
1818 #endif
1819           return rtsTrue; // tells schedule() to return
1820       }
1821
1822       return rtsFalse;
1823 }
1824
1825 /* -----------------------------------------------------------------------------
1826  * Perform a heap census, if PROFILING
1827  * -------------------------------------------------------------------------- */
1828
1829 static rtsBool
1830 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1831 {
1832 #if defined(PROFILING)
1833     // When we have +RTS -i0 and we're heap profiling, do a census at
1834     // every GC.  This lets us get repeatable runs for debugging.
1835     if (performHeapProfile ||
1836         (RtsFlags.ProfFlags.profileInterval==0 &&
1837          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1838         GarbageCollect(GetRoots, rtsTrue);
1839         heapCensus();
1840         performHeapProfile = rtsFalse;
1841         return rtsTrue;  // true <=> we already GC'd
1842     }
1843 #endif
1844     return rtsFalse;
1845 }
1846
1847 /* -----------------------------------------------------------------------------
1848  * Perform a garbage collection if necessary
1849  * -------------------------------------------------------------------------- */
1850
1851 static void
1852 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1853 {
1854     StgTSO *t;
1855 #ifdef SMP
1856     static volatile StgWord waiting_for_gc;
1857     rtsBool was_waiting;
1858     nat i;
1859 #endif
1860
1861 #ifdef SMP
1862     // In order to GC, there must be no threads running Haskell code.
1863     // Therefore, the GC thread needs to hold *all* the capabilities,
1864     // and release them after the GC has completed.  
1865     //
1866     // This seems to be the simplest way: previous attempts involved
1867     // making all the threads with capabilities give up their
1868     // capabilities and sleep except for the *last* one, which
1869     // actually did the GC.  But it's quite hard to arrange for all
1870     // the other tasks to sleep and stay asleep.
1871     //
1872         
1873     was_waiting = cas(&waiting_for_gc, 0, 1);
1874     if (was_waiting) {
1875         do {
1876             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1877             yieldCapability(&cap,task);
1878         } while (waiting_for_gc);
1879         return;
1880     }
1881
1882     for (i=0; i < n_capabilities; i++) {
1883         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1884         if (cap != &capabilities[i]) {
1885             Capability *pcap = &capabilities[i];
1886             // we better hope this task doesn't get migrated to
1887             // another Capability while we're waiting for this one.
1888             // It won't, because load balancing happens while we have
1889             // all the Capabilities, but even so it's a slightly
1890             // unsavoury invariant.
1891             task->cap = pcap;
1892             context_switch = 1;
1893             waitForReturnCapability(&pcap, task);
1894             if (pcap != &capabilities[i]) {
1895                 barf("scheduleDoGC: got the wrong capability");
1896             }
1897         }
1898     }
1899
1900     waiting_for_gc = rtsFalse;
1901 #endif
1902
1903     /* Kick any transactions which are invalid back to their
1904      * atomically frames.  When next scheduled they will try to
1905      * commit, this commit will fail and they will retry.
1906      */
1907     { 
1908         StgTSO *next;
1909
1910         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1911             if (t->what_next == ThreadRelocated) {
1912                 next = t->link;
1913             } else {
1914                 next = t->global_link;
1915                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1916                     if (!stmValidateNestOfTransactions (t -> trec)) {
1917                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1918                         
1919                         // strip the stack back to the
1920                         // ATOMICALLY_FRAME, aborting the (nested)
1921                         // transaction, and saving the stack of any
1922                         // partially-evaluated thunks on the heap.
1923                         raiseAsync_(cap, t, NULL, rtsTrue);
1924                         
1925 #ifdef REG_R1
1926                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1927 #endif
1928                     }
1929                 }
1930             }
1931         }
1932     }
1933     
1934     // so this happens periodically:
1935     scheduleCheckBlackHoles(cap);
1936     
1937     IF_DEBUG(scheduler, printAllThreads());
1938
1939     /* everybody back, start the GC.
1940      * Could do it in this thread, or signal a condition var
1941      * to do it in another thread.  Either way, we need to
1942      * broadcast on gc_pending_cond afterward.
1943      */
1944 #if defined(THREADED_RTS)
1945     IF_DEBUG(scheduler,sched_belch("doing GC"));
1946 #endif
1947     GarbageCollect(GetRoots, force_major);
1948     
1949 #if defined(SMP)
1950     // release our stash of capabilities.
1951     for (i = 0; i < n_capabilities; i++) {
1952         if (cap != &capabilities[i]) {
1953             task->cap = &capabilities[i];
1954             releaseCapability(&capabilities[i]);
1955         }
1956     }
1957     task->cap = cap;
1958 #endif
1959
1960 #if defined(GRAN)
1961     /* add a ContinueThread event to continue execution of current thread */
1962     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1963               ContinueThread,
1964               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1965     IF_GRAN_DEBUG(bq, 
1966                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1967                   G_EVENTQ(0);
1968                   G_CURR_THREADQ(0));
1969 #endif /* GRAN */
1970 }
1971
1972 /* ---------------------------------------------------------------------------
1973  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1974  * used by Control.Concurrent for error checking.
1975  * ------------------------------------------------------------------------- */
1976  
1977 StgBool
1978 rtsSupportsBoundThreads(void)
1979 {
1980 #if defined(THREADED_RTS)
1981   return rtsTrue;
1982 #else
1983   return rtsFalse;
1984 #endif
1985 }
1986
1987 /* ---------------------------------------------------------------------------
1988  * isThreadBound(tso): check whether tso is bound to an OS thread.
1989  * ------------------------------------------------------------------------- */
1990  
1991 StgBool
1992 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1993 {
1994 #if defined(THREADED_RTS)
1995   return (tso->bound != NULL);
1996 #endif
1997   return rtsFalse;
1998 }
1999
2000 /* ---------------------------------------------------------------------------
2001  * Singleton fork(). Do not copy any running threads.
2002  * ------------------------------------------------------------------------- */
2003
2004 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2005 #define FORKPROCESS_PRIMOP_SUPPORTED
2006 #endif
2007
2008 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2009 static void 
2010 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2011 #endif
2012 StgInt
2013 forkProcess(HsStablePtr *entry
2014 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2015             STG_UNUSED
2016 #endif
2017            )
2018 {
2019 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2020     pid_t pid;
2021     StgTSO* t,*next;
2022     Task *task;
2023     Capability *cap;
2024     
2025     IF_DEBUG(scheduler,sched_belch("forking!"));
2026     
2027     // ToDo: for SMP, we should probably acquire *all* the capabilities
2028     cap = rts_lock();
2029     
2030     pid = fork();
2031     
2032     if (pid) { // parent
2033         
2034         // just return the pid
2035         rts_unlock(cap);
2036         return pid;
2037         
2038     } else { // child
2039         
2040         // delete all threads
2041         cap->run_queue_hd = END_TSO_QUEUE;
2042         cap->run_queue_tl = END_TSO_QUEUE;
2043         
2044         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2045             next = t->link;
2046             
2047             // don't allow threads to catch the ThreadKilled exception
2048             deleteThreadImmediately(cap,t);
2049         }
2050         
2051         // wipe the main thread list
2052         while ((task = all_tasks) != NULL) {
2053             all_tasks = task->all_link;
2054             discardTask(task);
2055         }
2056         
2057         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2058         rts_checkSchedStatus("forkProcess",cap);
2059         
2060         rts_unlock(cap);
2061         hs_exit();                      // clean up and exit
2062         stg_exit(EXIT_SUCCESS);
2063     }
2064 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2065     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2066     return -1;
2067 #endif
2068 }
2069
2070 /* ---------------------------------------------------------------------------
2071  * Delete the threads on the run queue of the current capability.
2072  * ------------------------------------------------------------------------- */
2073    
2074 static void
2075 deleteRunQueue (Capability *cap)
2076 {
2077     StgTSO *t, *next;
2078     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2079         ASSERT(t->what_next != ThreadRelocated);
2080         next = t->link;
2081         deleteThread(cap, t);
2082     }
2083 }
2084
2085 /* startThread and  insertThread are now in GranSim.c -- HWL */
2086
2087
2088 /* -----------------------------------------------------------------------------
2089    Managing the suspended_ccalling_tasks list.
2090    Locks required: sched_mutex
2091    -------------------------------------------------------------------------- */
2092
2093 STATIC_INLINE void
2094 suspendTask (Capability *cap, Task *task)
2095 {
2096     ASSERT(task->next == NULL && task->prev == NULL);
2097     task->next = cap->suspended_ccalling_tasks;
2098     task->prev = NULL;
2099     if (cap->suspended_ccalling_tasks) {
2100         cap->suspended_ccalling_tasks->prev = task;
2101     }
2102     cap->suspended_ccalling_tasks = task;
2103 }
2104
2105 STATIC_INLINE void
2106 recoverSuspendedTask (Capability *cap, Task *task)
2107 {
2108     if (task->prev) {
2109         task->prev->next = task->next;
2110     } else {
2111         ASSERT(cap->suspended_ccalling_tasks == task);
2112         cap->suspended_ccalling_tasks = task->next;
2113     }
2114     if (task->next) {
2115         task->next->prev = task->prev;
2116     }
2117     task->next = task->prev = NULL;
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121  * Suspending & resuming Haskell threads.
2122  * 
2123  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2124  * its capability before calling the C function.  This allows another
2125  * task to pick up the capability and carry on running Haskell
2126  * threads.  It also means that if the C call blocks, it won't lock
2127  * the whole system.
2128  *
2129  * The Haskell thread making the C call is put to sleep for the
2130  * duration of the call, on the susepended_ccalling_threads queue.  We
2131  * give out a token to the task, which it can use to resume the thread
2132  * on return from the C function.
2133  * ------------------------------------------------------------------------- */
2134    
2135 void *
2136 suspendThread (StgRegTable *reg)
2137 {
2138   Capability *cap;
2139   int saved_errno = errno;
2140   StgTSO *tso;
2141   Task *task;
2142
2143   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2144    */
2145   cap = regTableToCapability(reg);
2146
2147   task = cap->running_task;
2148   tso = cap->r.rCurrentTSO;
2149
2150   IF_DEBUG(scheduler,
2151            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2152
2153   // XXX this might not be necessary --SDM
2154   tso->what_next = ThreadRunGHC;
2155
2156   threadPaused(tso);
2157
2158   if(tso->blocked_exceptions == NULL)  {
2159       tso->why_blocked = BlockedOnCCall;
2160       tso->blocked_exceptions = END_TSO_QUEUE;
2161   } else {
2162       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2163   }
2164
2165   // Hand back capability
2166   task->suspended_tso = tso;
2167
2168   ACQUIRE_LOCK(&cap->lock);
2169
2170   suspendTask(cap,task);
2171   cap->in_haskell = rtsFalse;
2172   releaseCapability_(cap);
2173   
2174   RELEASE_LOCK(&cap->lock);
2175
2176 #if defined(THREADED_RTS)
2177   /* Preparing to leave the RTS, so ensure there's a native thread/task
2178      waiting to take over.
2179   */
2180   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2181 #endif
2182
2183   errno = saved_errno;
2184   return task;
2185 }
2186
2187 StgRegTable *
2188 resumeThread (void *task_)
2189 {
2190     StgTSO *tso;
2191     Capability *cap;
2192     int saved_errno = errno;
2193     Task *task = task_;
2194
2195     cap = task->cap;
2196     // Wait for permission to re-enter the RTS with the result.
2197     waitForReturnCapability(&cap,task);
2198     // we might be on a different capability now... but if so, our
2199     // entry on the suspended_ccalling_tasks list will also have been
2200     // migrated.
2201
2202     // Remove the thread from the suspended list
2203     recoverSuspendedTask(cap,task);
2204
2205     tso = task->suspended_tso;
2206     task->suspended_tso = NULL;
2207     tso->link = END_TSO_QUEUE;
2208     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2209     
2210     if (tso->why_blocked == BlockedOnCCall) {
2211         awakenBlockedQueue(cap,tso->blocked_exceptions);
2212         tso->blocked_exceptions = NULL;
2213     }
2214     
2215     /* Reset blocking status */
2216     tso->why_blocked  = NotBlocked;
2217     
2218     cap->r.rCurrentTSO = tso;
2219     cap->in_haskell = rtsTrue;
2220     errno = saved_errno;
2221
2222     return &cap->r;
2223 }
2224
2225 /* ---------------------------------------------------------------------------
2226  * Comparing Thread ids.
2227  *
2228  * This is used from STG land in the implementation of the
2229  * instances of Eq/Ord for ThreadIds.
2230  * ------------------------------------------------------------------------ */
2231
2232 int
2233 cmp_thread(StgPtr tso1, StgPtr tso2) 
2234
2235   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2236   StgThreadID id2 = ((StgTSO *)tso2)->id;
2237  
2238   if (id1 < id2) return (-1);
2239   if (id1 > id2) return 1;
2240   return 0;
2241 }
2242
2243 /* ---------------------------------------------------------------------------
2244  * Fetching the ThreadID from an StgTSO.
2245  *
2246  * This is used in the implementation of Show for ThreadIds.
2247  * ------------------------------------------------------------------------ */
2248 int
2249 rts_getThreadId(StgPtr tso) 
2250 {
2251   return ((StgTSO *)tso)->id;
2252 }
2253
2254 #ifdef DEBUG
2255 void
2256 labelThread(StgPtr tso, char *label)
2257 {
2258   int len;
2259   void *buf;
2260
2261   /* Caveat: Once set, you can only set the thread name to "" */
2262   len = strlen(label)+1;
2263   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2264   strncpy(buf,label,len);
2265   /* Update will free the old memory for us */
2266   updateThreadLabel(((StgTSO *)tso)->id,buf);
2267 }
2268 #endif /* DEBUG */
2269
2270 /* ---------------------------------------------------------------------------
2271    Create a new thread.
2272
2273    The new thread starts with the given stack size.  Before the
2274    scheduler can run, however, this thread needs to have a closure
2275    (and possibly some arguments) pushed on its stack.  See
2276    pushClosure() in Schedule.h.
2277
2278    createGenThread() and createIOThread() (in SchedAPI.h) are
2279    convenient packaged versions of this function.
2280
2281    currently pri (priority) is only used in a GRAN setup -- HWL
2282    ------------------------------------------------------------------------ */
2283 #if defined(GRAN)
2284 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2285 StgTSO *
2286 createThread(nat size, StgInt pri)
2287 #else
2288 StgTSO *
2289 createThread(Capability *cap, nat size)
2290 #endif
2291 {
2292     StgTSO *tso;
2293     nat stack_size;
2294
2295     /* sched_mutex is *not* required */
2296
2297     /* First check whether we should create a thread at all */
2298 #if defined(PARALLEL_HASKELL)
2299     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2300     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2301         threadsIgnored++;
2302         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2303                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2304         return END_TSO_QUEUE;
2305     }
2306     threadsCreated++;
2307 #endif
2308
2309 #if defined(GRAN)
2310     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2311 #endif
2312
2313     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2314
2315     /* catch ridiculously small stack sizes */
2316     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2317         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2318     }
2319
2320     stack_size = size - TSO_STRUCT_SIZEW;
2321     
2322     tso = (StgTSO *)allocateLocal(cap, size);
2323     TICK_ALLOC_TSO(stack_size, 0);
2324
2325     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2326 #if defined(GRAN)
2327     SET_GRAN_HDR(tso, ThisPE);
2328 #endif
2329
2330     // Always start with the compiled code evaluator
2331     tso->what_next = ThreadRunGHC;
2332
2333     tso->why_blocked  = NotBlocked;
2334     tso->blocked_exceptions = NULL;
2335     
2336     tso->saved_errno = 0;
2337     tso->bound = NULL;
2338     
2339     tso->stack_size     = stack_size;
2340     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2341                           - TSO_STRUCT_SIZEW;
2342     tso->sp             = (P_)&(tso->stack) + stack_size;
2343
2344     tso->trec = NO_TREC;
2345     
2346 #ifdef PROFILING
2347     tso->prof.CCCS = CCS_MAIN;
2348 #endif
2349     
2350   /* put a stop frame on the stack */
2351     tso->sp -= sizeofW(StgStopFrame);
2352     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2353     tso->link = END_TSO_QUEUE;
2354     
2355   // ToDo: check this
2356 #if defined(GRAN)
2357     /* uses more flexible routine in GranSim */
2358     insertThread(tso, CurrentProc);
2359 #else
2360     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2361      * from its creation
2362      */
2363 #endif
2364     
2365 #if defined(GRAN) 
2366     if (RtsFlags.GranFlags.GranSimStats.Full) 
2367         DumpGranEvent(GR_START,tso);
2368 #elif defined(PARALLEL_HASKELL)
2369     if (RtsFlags.ParFlags.ParStats.Full) 
2370         DumpGranEvent(GR_STARTQ,tso);
2371     /* HACk to avoid SCHEDULE 
2372        LastTSO = tso; */
2373 #endif
2374     
2375     /* Link the new thread on the global thread list.
2376      */
2377     ACQUIRE_LOCK(&sched_mutex);
2378     tso->id = next_thread_id++;  // while we have the mutex
2379     tso->global_link = all_threads;
2380     all_threads = tso;
2381     RELEASE_LOCK(&sched_mutex);
2382     
2383 #if defined(DIST)
2384     tso->dist.priority = MandatoryPriority; //by default that is...
2385 #endif
2386     
2387 #if defined(GRAN)
2388     tso->gran.pri = pri;
2389 # if defined(DEBUG)
2390     tso->gran.magic = TSO_MAGIC; // debugging only
2391 # endif
2392     tso->gran.sparkname   = 0;
2393     tso->gran.startedat   = CURRENT_TIME; 
2394     tso->gran.exported    = 0;
2395     tso->gran.basicblocks = 0;
2396     tso->gran.allocs      = 0;
2397     tso->gran.exectime    = 0;
2398     tso->gran.fetchtime   = 0;
2399     tso->gran.fetchcount  = 0;
2400     tso->gran.blocktime   = 0;
2401     tso->gran.blockcount  = 0;
2402     tso->gran.blockedat   = 0;
2403     tso->gran.globalsparks = 0;
2404     tso->gran.localsparks  = 0;
2405     if (RtsFlags.GranFlags.Light)
2406         tso->gran.clock  = Now; /* local clock */
2407     else
2408         tso->gran.clock  = 0;
2409     
2410     IF_DEBUG(gran,printTSO(tso));
2411 #elif defined(PARALLEL_HASKELL)
2412 # if defined(DEBUG)
2413     tso->par.magic = TSO_MAGIC; // debugging only
2414 # endif
2415     tso->par.sparkname   = 0;
2416     tso->par.startedat   = CURRENT_TIME; 
2417     tso->par.exported    = 0;
2418     tso->par.basicblocks = 0;
2419     tso->par.allocs      = 0;
2420     tso->par.exectime    = 0;
2421     tso->par.fetchtime   = 0;
2422     tso->par.fetchcount  = 0;
2423     tso->par.blocktime   = 0;
2424     tso->par.blockcount  = 0;
2425     tso->par.blockedat   = 0;
2426     tso->par.globalsparks = 0;
2427     tso->par.localsparks  = 0;
2428 #endif
2429     
2430 #if defined(GRAN)
2431     globalGranStats.tot_threads_created++;
2432     globalGranStats.threads_created_on_PE[CurrentProc]++;
2433     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2434     globalGranStats.tot_sq_probes++;
2435 #elif defined(PARALLEL_HASKELL)
2436     // collect parallel global statistics (currently done together with GC stats)
2437     if (RtsFlags.ParFlags.ParStats.Global &&
2438         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2439         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2440         globalParStats.tot_threads_created++;
2441     }
2442 #endif 
2443     
2444 #if defined(GRAN)
2445     IF_GRAN_DEBUG(pri,
2446                   sched_belch("==__ schedule: Created TSO %d (%p);",
2447                               CurrentProc, tso, tso->id));
2448 #elif defined(PARALLEL_HASKELL)
2449     IF_PAR_DEBUG(verbose,
2450                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2451                              (long)tso->id, tso, advisory_thread_count));
2452 #else
2453     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2454                                    (long)tso->id, (long)tso->stack_size));
2455 #endif    
2456     return tso;
2457 }
2458
2459 #if defined(PAR)
2460 /* RFP:
2461    all parallel thread creation calls should fall through the following routine.
2462 */
2463 StgTSO *
2464 createThreadFromSpark(rtsSpark spark) 
2465 { StgTSO *tso;
2466   ASSERT(spark != (rtsSpark)NULL);
2467 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2468   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2469   { threadsIgnored++;
2470     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2471           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2472     return END_TSO_QUEUE;
2473   }
2474   else
2475   { threadsCreated++;
2476     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2477     if (tso==END_TSO_QUEUE)     
2478       barf("createSparkThread: Cannot create TSO");
2479 #if defined(DIST)
2480     tso->priority = AdvisoryPriority;
2481 #endif
2482     pushClosure(tso,spark);
2483     addToRunQueue(tso);
2484     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2485   }
2486   return tso;
2487 }
2488 #endif
2489
2490 /*
2491   Turn a spark into a thread.
2492   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2493 */
2494 #if 0
2495 StgTSO *
2496 activateSpark (rtsSpark spark) 
2497 {
2498   StgTSO *tso;
2499
2500   tso = createSparkThread(spark);
2501   if (RtsFlags.ParFlags.ParStats.Full) {   
2502     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2503       IF_PAR_DEBUG(verbose,
2504                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2505                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2506   }
2507   // ToDo: fwd info on local/global spark to thread -- HWL
2508   // tso->gran.exported =  spark->exported;
2509   // tso->gran.locked =   !spark->global;
2510   // tso->gran.sparkname = spark->name;
2511
2512   return tso;
2513 }
2514 #endif
2515
2516 /* ---------------------------------------------------------------------------
2517  * scheduleThread()
2518  *
2519  * scheduleThread puts a thread on the end  of the runnable queue.
2520  * This will usually be done immediately after a thread is created.
2521  * The caller of scheduleThread must create the thread using e.g.
2522  * createThread and push an appropriate closure
2523  * on this thread's stack before the scheduler is invoked.
2524  * ------------------------------------------------------------------------ */
2525
2526 void
2527 scheduleThread(Capability *cap, StgTSO *tso)
2528 {
2529     // The thread goes at the *end* of the run-queue, to avoid possible
2530     // starvation of any threads already on the queue.
2531     appendToRunQueue(cap,tso);
2532 }
2533
2534 Capability *
2535 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2536 {
2537     Task *task;
2538
2539     // We already created/initialised the Task
2540     task = cap->running_task;
2541
2542     // This TSO is now a bound thread; make the Task and TSO
2543     // point to each other.
2544     tso->bound = task;
2545
2546     task->tso = tso;
2547     task->ret = ret;
2548     task->stat = NoStatus;
2549
2550     appendToRunQueue(cap,tso);
2551
2552     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2553
2554 #if defined(GRAN)
2555     /* GranSim specific init */
2556     CurrentTSO = m->tso;                // the TSO to run
2557     procStatus[MainProc] = Busy;        // status of main PE
2558     CurrentProc = MainProc;             // PE to run it on
2559 #endif
2560
2561     cap = schedule(cap,task);
2562
2563     ASSERT(task->stat != NoStatus);
2564     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2565
2566     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2567     return cap;
2568 }
2569
2570 /* ----------------------------------------------------------------------------
2571  * Starting Tasks
2572  * ------------------------------------------------------------------------- */
2573
2574 #if defined(THREADED_RTS)
2575 void
2576 workerStart(Task *task)
2577 {
2578     Capability *cap;
2579
2580     // See startWorkerTask().
2581     ACQUIRE_LOCK(&task->lock);
2582     cap = task->cap;
2583     RELEASE_LOCK(&task->lock);
2584
2585     // set the thread-local pointer to the Task:
2586     taskEnter(task);
2587
2588     // schedule() runs without a lock.
2589     cap = schedule(cap,task);
2590
2591     // On exit from schedule(), we have a Capability.
2592     releaseCapability(cap);
2593     taskStop(task);
2594 }
2595 #endif
2596
2597 /* ---------------------------------------------------------------------------
2598  * initScheduler()
2599  *
2600  * Initialise the scheduler.  This resets all the queues - if the
2601  * queues contained any threads, they'll be garbage collected at the
2602  * next pass.
2603  *
2604  * ------------------------------------------------------------------------ */
2605
2606 void 
2607 initScheduler(void)
2608 {
2609 #if defined(GRAN)
2610   nat i;
2611   for (i=0; i<=MAX_PROC; i++) {
2612     run_queue_hds[i]      = END_TSO_QUEUE;
2613     run_queue_tls[i]      = END_TSO_QUEUE;
2614     blocked_queue_hds[i]  = END_TSO_QUEUE;
2615     blocked_queue_tls[i]  = END_TSO_QUEUE;
2616     ccalling_threadss[i]  = END_TSO_QUEUE;
2617     blackhole_queue[i]    = END_TSO_QUEUE;
2618     sleeping_queue        = END_TSO_QUEUE;
2619   }
2620 #elif !defined(THREADED_RTS)
2621   blocked_queue_hd  = END_TSO_QUEUE;
2622   blocked_queue_tl  = END_TSO_QUEUE;
2623   sleeping_queue    = END_TSO_QUEUE;
2624 #endif
2625
2626   blackhole_queue   = END_TSO_QUEUE;
2627   all_threads       = END_TSO_QUEUE;
2628
2629   context_switch = 0;
2630   interrupted    = 0;
2631
2632   RtsFlags.ConcFlags.ctxtSwitchTicks =
2633       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2634       
2635 #if defined(THREADED_RTS)
2636   /* Initialise the mutex and condition variables used by
2637    * the scheduler. */
2638   initMutex(&sched_mutex);
2639 #endif
2640   
2641   ACQUIRE_LOCK(&sched_mutex);
2642
2643   /* A capability holds the state a native thread needs in
2644    * order to execute STG code. At least one capability is
2645    * floating around (only SMP builds have more than one).
2646    */
2647   initCapabilities();
2648
2649   initTaskManager();
2650
2651 #if defined(SMP)
2652   /*
2653    * Eagerly start one worker to run each Capability, except for
2654    * Capability 0.  The idea is that we're probably going to start a
2655    * bound thread on Capability 0 pretty soon, so we don't want a
2656    * worker task hogging it.
2657    */
2658   { 
2659       nat i;
2660       Capability *cap;
2661       for (i = 1; i < n_capabilities; i++) {
2662           cap = &capabilities[i];
2663           ACQUIRE_LOCK(&cap->lock);
2664           startWorkerTask(cap, workerStart);
2665           RELEASE_LOCK(&cap->lock);
2666       }
2667   }
2668 #endif
2669
2670 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2671   initSparkPools();
2672 #endif
2673
2674   RELEASE_LOCK(&sched_mutex);
2675 }
2676
2677 void
2678 exitScheduler( void )
2679 {
2680     interrupted = rtsTrue;
2681     shutting_down_scheduler = rtsTrue;
2682
2683 #if defined(THREADED_RTS)
2684     { 
2685         Task *task;
2686         nat i;
2687         
2688         ACQUIRE_LOCK(&sched_mutex);
2689         task = newBoundTask();
2690         RELEASE_LOCK(&sched_mutex);
2691
2692         for (i = 0; i < n_capabilities; i++) {
2693             shutdownCapability(&capabilities[i], task);
2694         }
2695         boundTaskExiting(task);
2696         stopTaskManager();
2697     }
2698 #endif
2699 }
2700
2701 /* ---------------------------------------------------------------------------
2702    Where are the roots that we know about?
2703
2704         - all the threads on the runnable queue
2705         - all the threads on the blocked queue
2706         - all the threads on the sleeping queue
2707         - all the thread currently executing a _ccall_GC
2708         - all the "main threads"
2709      
2710    ------------------------------------------------------------------------ */
2711
2712 /* This has to be protected either by the scheduler monitor, or by the
2713         garbage collection monitor (probably the latter).
2714         KH @ 25/10/99
2715 */
2716
2717 void
2718 GetRoots( evac_fn evac )
2719 {
2720     nat i;
2721     Capability *cap;
2722     Task *task;
2723
2724 #if defined(GRAN)
2725     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2726         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2727             evac((StgClosure **)&run_queue_hds[i]);
2728         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2729             evac((StgClosure **)&run_queue_tls[i]);
2730         
2731         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2732             evac((StgClosure **)&blocked_queue_hds[i]);
2733         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2734             evac((StgClosure **)&blocked_queue_tls[i]);
2735         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2736             evac((StgClosure **)&ccalling_threads[i]);
2737     }
2738
2739     markEventQueue();
2740
2741 #else /* !GRAN */
2742
2743     for (i = 0; i < n_capabilities; i++) {
2744         cap = &capabilities[i];
2745         evac((StgClosure **)&cap->run_queue_hd);
2746         evac((StgClosure **)&cap->run_queue_tl);
2747         
2748         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2749              task=task->next) {
2750             evac((StgClosure **)&task->suspended_tso);
2751         }
2752     }
2753     
2754 #if !defined(THREADED_RTS)
2755     evac((StgClosure **)&blocked_queue_hd);
2756     evac((StgClosure **)&blocked_queue_tl);
2757     evac((StgClosure **)&sleeping_queue);
2758 #endif 
2759 #endif
2760
2761     evac((StgClosure **)&blackhole_queue);
2762
2763 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2764     markSparkQueue(evac);
2765 #endif
2766     
2767 #if defined(RTS_USER_SIGNALS)
2768     // mark the signal handlers (signals should be already blocked)
2769     markSignalHandlers(evac);
2770 #endif
2771 }
2772
2773 /* -----------------------------------------------------------------------------
2774    performGC
2775
2776    This is the interface to the garbage collector from Haskell land.
2777    We provide this so that external C code can allocate and garbage
2778    collect when called from Haskell via _ccall_GC.
2779
2780    It might be useful to provide an interface whereby the programmer
2781    can specify more roots (ToDo).
2782    
2783    This needs to be protected by the GC condition variable above.  KH.
2784    -------------------------------------------------------------------------- */
2785
2786 static void (*extra_roots)(evac_fn);
2787
2788 void
2789 performGC(void)
2790 {
2791 #ifdef THREADED_RTS
2792     // ToDo: we have to grab all the capabilities here.
2793     errorBelch("performGC not supported in threaded RTS (yet)");
2794     stg_exit(EXIT_FAILURE);
2795 #endif
2796     /* Obligated to hold this lock upon entry */
2797     GarbageCollect(GetRoots,rtsFalse);
2798 }
2799
2800 void
2801 performMajorGC(void)
2802 {
2803 #ifdef THREADED_RTS
2804     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2805     stg_exit(EXIT_FAILURE);
2806 #endif
2807     GarbageCollect(GetRoots,rtsTrue);
2808 }
2809
2810 static void
2811 AllRoots(evac_fn evac)
2812 {
2813     GetRoots(evac);             // the scheduler's roots
2814     extra_roots(evac);          // the user's roots
2815 }
2816
2817 void
2818 performGCWithRoots(void (*get_roots)(evac_fn))
2819 {
2820 #ifdef THREADED_RTS
2821     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2822     stg_exit(EXIT_FAILURE);
2823 #endif
2824     extra_roots = get_roots;
2825     GarbageCollect(AllRoots,rtsFalse);
2826 }
2827
2828 /* -----------------------------------------------------------------------------
2829    Stack overflow
2830
2831    If the thread has reached its maximum stack size, then raise the
2832    StackOverflow exception in the offending thread.  Otherwise
2833    relocate the TSO into a larger chunk of memory and adjust its stack
2834    size appropriately.
2835    -------------------------------------------------------------------------- */
2836
2837 static StgTSO *
2838 threadStackOverflow(Capability *cap, StgTSO *tso)
2839 {
2840   nat new_stack_size, stack_words;
2841   lnat new_tso_size;
2842   StgPtr new_sp;
2843   StgTSO *dest;
2844
2845   IF_DEBUG(sanity,checkTSO(tso));
2846   if (tso->stack_size >= tso->max_stack_size) {
2847
2848     IF_DEBUG(gc,
2849              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2850                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2851              /* If we're debugging, just print out the top of the stack */
2852              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2853                                               tso->sp+64)));
2854
2855     /* Send this thread the StackOverflow exception */
2856     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2857     return tso;
2858   }
2859
2860   /* Try to double the current stack size.  If that takes us over the
2861    * maximum stack size for this thread, then use the maximum instead.
2862    * Finally round up so the TSO ends up as a whole number of blocks.
2863    */
2864   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2865   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2866                                        TSO_STRUCT_SIZE)/sizeof(W_);
2867   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2868   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2869
2870   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2871
2872   dest = (StgTSO *)allocate(new_tso_size);
2873   TICK_ALLOC_TSO(new_stack_size,0);
2874
2875   /* copy the TSO block and the old stack into the new area */
2876   memcpy(dest,tso,TSO_STRUCT_SIZE);
2877   stack_words = tso->stack + tso->stack_size - tso->sp;
2878   new_sp = (P_)dest + new_tso_size - stack_words;
2879   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2880
2881   /* relocate the stack pointers... */
2882   dest->sp         = new_sp;
2883   dest->stack_size = new_stack_size;
2884         
2885   /* Mark the old TSO as relocated.  We have to check for relocated
2886    * TSOs in the garbage collector and any primops that deal with TSOs.
2887    *
2888    * It's important to set the sp value to just beyond the end
2889    * of the stack, so we don't attempt to scavenge any part of the
2890    * dead TSO's stack.
2891    */
2892   tso->what_next = ThreadRelocated;
2893   tso->link = dest;
2894   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2895   tso->why_blocked = NotBlocked;
2896
2897   IF_PAR_DEBUG(verbose,
2898                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2899                      tso->id, tso, tso->stack_size);
2900                /* If we're debugging, just print out the top of the stack */
2901                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2902                                                 tso->sp+64)));
2903   
2904   IF_DEBUG(sanity,checkTSO(tso));
2905 #if 0
2906   IF_DEBUG(scheduler,printTSO(dest));
2907 #endif
2908
2909   return dest;
2910 }
2911
2912 /* ---------------------------------------------------------------------------
2913    Wake up a queue that was blocked on some resource.
2914    ------------------------------------------------------------------------ */
2915
2916 #if defined(GRAN)
2917 STATIC_INLINE void
2918 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2919 {
2920 }
2921 #elif defined(PARALLEL_HASKELL)
2922 STATIC_INLINE void
2923 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2924 {
2925   /* write RESUME events to log file and
2926      update blocked and fetch time (depending on type of the orig closure) */
2927   if (RtsFlags.ParFlags.ParStats.Full) {
2928     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2929                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2930                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2931     if (emptyRunQueue())
2932       emitSchedule = rtsTrue;
2933
2934     switch (get_itbl(node)->type) {
2935         case FETCH_ME_BQ:
2936           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2937           break;
2938         case RBH:
2939         case FETCH_ME:
2940         case BLACKHOLE_BQ:
2941           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2942           break;
2943 #ifdef DIST
2944         case MVAR:
2945           break;
2946 #endif    
2947         default:
2948           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2949         }
2950       }
2951 }
2952 #endif
2953
2954 #if defined(GRAN)
2955 StgBlockingQueueElement *
2956 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2957 {
2958     StgTSO *tso;
2959     PEs node_loc, tso_loc;
2960
2961     node_loc = where_is(node); // should be lifted out of loop
2962     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2963     tso_loc = where_is((StgClosure *)tso);
2964     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2965       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2966       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2967       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2968       // insertThread(tso, node_loc);
2969       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2970                 ResumeThread,
2971                 tso, node, (rtsSpark*)NULL);
2972       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2973       // len_local++;
2974       // len++;
2975     } else { // TSO is remote (actually should be FMBQ)
2976       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2977                                   RtsFlags.GranFlags.Costs.gunblocktime +
2978                                   RtsFlags.GranFlags.Costs.latency;
2979       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2980                 UnblockThread,
2981                 tso, node, (rtsSpark*)NULL);
2982       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2983       // len++;
2984     }
2985     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2986     IF_GRAN_DEBUG(bq,
2987                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2988                           (node_loc==tso_loc ? "Local" : "Global"), 
2989                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2990     tso->block_info.closure = NULL;
2991     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2992                              tso->id, tso));
2993 }
2994 #elif defined(PARALLEL_HASKELL)
2995 StgBlockingQueueElement *
2996 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2997 {
2998     StgBlockingQueueElement *next;
2999
3000     switch (get_itbl(bqe)->type) {
3001     case TSO:
3002       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3003       /* if it's a TSO just push it onto the run_queue */
3004       next = bqe->link;
3005       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3006       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3007       threadRunnable();
3008       unblockCount(bqe, node);
3009       /* reset blocking status after dumping event */
3010       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3011       break;
3012
3013     case BLOCKED_FETCH:
3014       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3015       next = bqe->link;
3016       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3017       PendingFetches = (StgBlockedFetch *)bqe;
3018       break;
3019
3020 # if defined(DEBUG)
3021       /* can ignore this case in a non-debugging setup; 
3022          see comments on RBHSave closures above */
3023     case CONSTR:
3024       /* check that the closure is an RBHSave closure */
3025       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3026              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3027              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3028       break;
3029
3030     default:
3031       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3032            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3033            (StgClosure *)bqe);
3034 # endif
3035     }
3036   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3037   return next;
3038 }
3039 #endif
3040
3041 StgTSO *
3042 unblockOne(Capability *cap, StgTSO *tso)
3043 {
3044   StgTSO *next;
3045
3046   ASSERT(get_itbl(tso)->type == TSO);
3047   ASSERT(tso->why_blocked != NotBlocked);
3048   tso->why_blocked = NotBlocked;
3049   next = tso->link;
3050   tso->link = END_TSO_QUEUE;
3051
3052   // We might have just migrated this TSO to our Capability:
3053   if (tso->bound) {
3054       tso->bound->cap = cap;
3055   }
3056
3057   appendToRunQueue(cap,tso);
3058
3059   // we're holding a newly woken thread, make sure we context switch
3060   // quickly so we can migrate it if necessary.
3061   context_switch = 1;
3062   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3063   return next;
3064 }
3065
3066
3067 #if defined(GRAN)
3068 void 
3069 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3070 {
3071   StgBlockingQueueElement *bqe;
3072   PEs node_loc;
3073   nat len = 0; 
3074
3075   IF_GRAN_DEBUG(bq, 
3076                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3077                       node, CurrentProc, CurrentTime[CurrentProc], 
3078                       CurrentTSO->id, CurrentTSO));
3079
3080   node_loc = where_is(node);
3081
3082   ASSERT(q == END_BQ_QUEUE ||
3083          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3084          get_itbl(q)->type == CONSTR); // closure (type constructor)
3085   ASSERT(is_unique(node));
3086
3087   /* FAKE FETCH: magically copy the node to the tso's proc;
3088      no Fetch necessary because in reality the node should not have been 
3089      moved to the other PE in the first place
3090   */
3091   if (CurrentProc!=node_loc) {
3092     IF_GRAN_DEBUG(bq, 
3093                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3094                         node, node_loc, CurrentProc, CurrentTSO->id, 
3095                         // CurrentTSO, where_is(CurrentTSO),
3096                         node->header.gran.procs));
3097     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3098     IF_GRAN_DEBUG(bq, 
3099                   debugBelch("## new bitmask of node %p is %#x\n",
3100                         node, node->header.gran.procs));
3101     if (RtsFlags.GranFlags.GranSimStats.Global) {
3102       globalGranStats.tot_fake_fetches++;
3103     }
3104   }
3105
3106   bqe = q;
3107   // ToDo: check: ASSERT(CurrentProc==node_loc);
3108   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3109     //next = bqe->link;
3110     /* 
3111        bqe points to the current element in the queue
3112        next points to the next element in the queue
3113     */
3114     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3115     //tso_loc = where_is(tso);
3116     len++;
3117     bqe = unblockOne(bqe, node);
3118   }
3119
3120   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3121      the closure to make room for the anchor of the BQ */
3122   if (bqe!=END_BQ_QUEUE) {
3123     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3124     /*
3125     ASSERT((info_ptr==&RBH_Save_0_info) ||
3126            (info_ptr==&RBH_Save_1_info) ||
3127            (info_ptr==&RBH_Save_2_info));
3128     */
3129     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3130     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3131     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3132
3133     IF_GRAN_DEBUG(bq,
3134                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3135                         node, info_type(node)));
3136   }
3137
3138   /* statistics gathering */
3139   if (RtsFlags.GranFlags.GranSimStats.Global) {
3140     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3141     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3142     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3143     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3144   }
3145   IF_GRAN_DEBUG(bq,
3146                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3147                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3148 }
3149 #elif defined(PARALLEL_HASKELL)
3150 void 
3151 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3152 {
3153   StgBlockingQueueElement *bqe;
3154
3155   IF_PAR_DEBUG(verbose, 
3156                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3157                      node, mytid));
3158 #ifdef DIST  
3159   //RFP
3160   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3161     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3162     return;
3163   }
3164 #endif
3165   
3166   ASSERT(q == END_BQ_QUEUE ||
3167          get_itbl(q)->type == TSO ||           
3168          get_itbl(q)->type == BLOCKED_FETCH || 
3169          get_itbl(q)->type == CONSTR); 
3170
3171   bqe = q;
3172   while (get_itbl(bqe)->type==TSO || 
3173          get_itbl(bqe)->type==BLOCKED_FETCH) {
3174     bqe = unblockOne(bqe, node);
3175   }
3176 }
3177
3178 #else   /* !GRAN && !PARALLEL_HASKELL */
3179
3180 void
3181 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3182 {
3183     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3184                              // Exception.cmm
3185     while (tso != END_TSO_QUEUE) {
3186         tso = unblockOne(cap,tso);
3187     }
3188 }
3189 #endif
3190
3191 /* ---------------------------------------------------------------------------
3192    Interrupt execution
3193    - usually called inside a signal handler so it mustn't do anything fancy.   
3194    ------------------------------------------------------------------------ */
3195
3196 void
3197 interruptStgRts(void)
3198 {
3199     interrupted    = 1;
3200     context_switch = 1;
3201 #if defined(THREADED_RTS)
3202     prodAllCapabilities();
3203 #endif
3204 }
3205
3206 /* -----------------------------------------------------------------------------
3207    Unblock a thread
3208
3209    This is for use when we raise an exception in another thread, which
3210    may be blocked.
3211    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3212    -------------------------------------------------------------------------- */
3213
3214 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3215 /*
3216   NB: only the type of the blocking queue is different in GranSim and GUM
3217       the operations on the queue-elements are the same
3218       long live polymorphism!
3219
3220   Locks: sched_mutex is held upon entry and exit.
3221
3222 */
3223 static void
3224 unblockThread(Capability *cap, StgTSO *tso)
3225 {
3226   StgBlockingQueueElement *t, **last;
3227
3228   switch (tso->why_blocked) {
3229
3230   case NotBlocked:
3231     return;  /* not blocked */
3232
3233   case BlockedOnSTM:
3234     // Be careful: nothing to do here!  We tell the scheduler that the thread
3235     // is runnable and we leave it to the stack-walking code to abort the 
3236     // transaction while unwinding the stack.  We should perhaps have a debugging
3237     // test to make sure that this really happens and that the 'zombie' transaction
3238     // does not get committed.
3239     goto done;
3240
3241   case BlockedOnMVar:
3242     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3243     {
3244       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3245       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3246
3247       last = (StgBlockingQueueElement **)&mvar->head;
3248       for (t = (StgBlockingQueueElement *)mvar->head; 
3249            t != END_BQ_QUEUE; 
3250            last = &t->link, last_tso = t, t = t->link) {
3251         if (t == (StgBlockingQueueElement *)tso) {
3252           *last = (StgBlockingQueueElement *)tso->link;
3253           if (mvar->tail == tso) {
3254             mvar->tail = (StgTSO *)last_tso;
3255           }
3256           goto done;
3257         }
3258       }
3259       barf("unblockThread (MVAR): TSO not found");
3260     }
3261
3262   case BlockedOnBlackHole:
3263     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3264     {
3265       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3266
3267       last = &bq->blocking_queue;
3268       for (t = bq->blocking_queue; 
3269            t != END_BQ_QUEUE; 
3270            last = &t->link, t = t->link) {
3271         if (t == (StgBlockingQueueElement *)tso) {
3272           *last = (StgBlockingQueueElement *)tso->link;
3273           goto done;
3274         }
3275       }
3276       barf("unblockThread (BLACKHOLE): TSO not found");
3277     }
3278
3279   case BlockedOnException:
3280     {
3281       StgTSO *target  = tso->block_info.tso;
3282
3283       ASSERT(get_itbl(target)->type == TSO);
3284
3285       if (target->what_next == ThreadRelocated) {
3286           target = target->link;
3287           ASSERT(get_itbl(target)->type == TSO);
3288       }
3289
3290       ASSERT(target->blocked_exceptions != NULL);
3291
3292       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3293       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3294            t != END_BQ_QUEUE; 
3295            last = &t->link, t = t->link) {
3296         ASSERT(get_itbl(t)->type == TSO);
3297         if (t == (StgBlockingQueueElement *)tso) {
3298           *last = (StgBlockingQueueElement *)tso->link;
3299           goto done;
3300         }
3301       }
3302       barf("unblockThread (Exception): TSO not found");
3303     }
3304
3305   case BlockedOnRead:
3306   case BlockedOnWrite:
3307 #if defined(mingw32_HOST_OS)
3308   case BlockedOnDoProc:
3309 #endif
3310     {
3311       /* take TSO off blocked_queue */
3312       StgBlockingQueueElement *prev = NULL;
3313       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3314            prev = t, t = t->link) {
3315         if (t == (StgBlockingQueueElement *)tso) {
3316           if (prev == NULL) {
3317             blocked_queue_hd = (StgTSO *)t->link;
3318             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3319               blocked_queue_tl = END_TSO_QUEUE;
3320             }
3321           } else {
3322             prev->link = t->link;
3323             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3324               blocked_queue_tl = (StgTSO *)prev;
3325             }
3326           }
3327 #if defined(mingw32_HOST_OS)
3328           /* (Cooperatively) signal that the worker thread should abort
3329            * the request.
3330            */
3331           abandonWorkRequest(tso->block_info.async_result->reqID);
3332 #endif
3333           goto done;
3334         }
3335       }
3336       barf("unblockThread (I/O): TSO not found");
3337     }
3338
3339   case BlockedOnDelay:
3340     {
3341       /* take TSO off sleeping_queue */
3342       StgBlockingQueueElement *prev = NULL;
3343       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3344            prev = t, t = t->link) {
3345         if (t == (StgBlockingQueueElement *)tso) {
3346           if (prev == NULL) {
3347             sleeping_queue = (StgTSO *)t->link;
3348           } else {
3349             prev->link = t->link;
3350           }
3351           goto done;
3352         }
3353       }
3354       barf("unblockThread (delay): TSO not found");
3355     }
3356
3357   default:
3358     barf("unblockThread");
3359   }
3360
3361  done:
3362   tso->link = END_TSO_QUEUE;
3363   tso->why_blocked = NotBlocked;
3364   tso->block_info.closure = NULL;
3365   pushOnRunQueue(cap,tso);
3366 }
3367 #else
3368 static void
3369 unblockThread(Capability *cap, StgTSO *tso)
3370 {
3371   StgTSO *t, **last;
3372   
3373   /* To avoid locking unnecessarily. */
3374   if (tso->why_blocked == NotBlocked) {
3375     return;
3376   }
3377
3378   switch (tso->why_blocked) {
3379
3380   case BlockedOnSTM:
3381     // Be careful: nothing to do here!  We tell the scheduler that the thread
3382     // is runnable and we leave it to the stack-walking code to abort the 
3383     // transaction while unwinding the stack.  We should perhaps have a debugging
3384     // test to make sure that this really happens and that the 'zombie' transaction
3385     // does not get committed.
3386     goto done;
3387
3388   case BlockedOnMVar:
3389     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3390     {
3391       StgTSO *last_tso = END_TSO_QUEUE;
3392       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3393
3394       last = &mvar->head;
3395       for (t = mvar->head; t != END_TSO_QUEUE; 
3396            last = &t->link, last_tso = t, t = t->link) {
3397         if (t == tso) {
3398           *last = tso->link;
3399           if (mvar->tail == tso) {
3400             mvar->tail = last_tso;
3401           }
3402           goto done;
3403         }
3404       }
3405       barf("unblockThread (MVAR): TSO not found");
3406     }
3407
3408   case BlockedOnBlackHole:
3409     {
3410       last = &blackhole_queue;
3411       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3412            last = &t->link, t = t->link) {
3413         if (t == tso) {
3414           *last = tso->link;
3415           goto done;
3416         }
3417       }
3418       barf("unblockThread (BLACKHOLE): TSO not found");
3419     }
3420
3421   case BlockedOnException:
3422     {
3423       StgTSO *target  = tso->block_info.tso;
3424
3425       ASSERT(get_itbl(target)->type == TSO);
3426
3427       while (target->what_next == ThreadRelocated) {
3428           target = target->link;
3429           ASSERT(get_itbl(target)->type == TSO);
3430       }
3431       
3432       ASSERT(target->blocked_exceptions != NULL);
3433
3434       last = &target->blocked_exceptions;
3435       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3436            last = &t->link, t = t->link) {
3437         ASSERT(get_itbl(t)->type == TSO);
3438         if (t == tso) {
3439           *last = tso->link;
3440           goto done;
3441         }
3442       }
3443       barf("unblockThread (Exception): TSO not found");
3444     }
3445
3446 #if !defined(THREADED_RTS)
3447   case BlockedOnRead:
3448   case BlockedOnWrite:
3449 #if defined(mingw32_HOST_OS)
3450   case BlockedOnDoProc:
3451 #endif
3452     {
3453       StgTSO *prev = NULL;
3454       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3455            prev = t, t = t->link) {
3456         if (t == tso) {
3457           if (prev == NULL) {
3458             blocked_queue_hd = t->link;
3459             if (blocked_queue_tl == t) {
3460               blocked_queue_tl = END_TSO_QUEUE;
3461             }
3462           } else {
3463             prev->link = t->link;
3464             if (blocked_queue_tl == t) {
3465               blocked_queue_tl = prev;
3466             }
3467           }
3468 #if defined(mingw32_HOST_OS)
3469           /* (Cooperatively) signal that the worker thread should abort
3470            * the request.
3471            */
3472           abandonWorkRequest(tso->block_info.async_result->reqID);
3473 #endif
3474           goto done;
3475         }
3476       }
3477       barf("unblockThread (I/O): TSO not found");
3478     }
3479
3480   case BlockedOnDelay:
3481     {
3482       StgTSO *prev = NULL;
3483       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3484            prev = t, t = t->link) {
3485         if (t == tso) {
3486           if (prev == NULL) {
3487             sleeping_queue = t->link;
3488           } else {
3489             prev->link = t->link;
3490           }
3491           goto done;
3492         }
3493       }
3494       barf("unblockThread (delay): TSO not found");
3495     }
3496 #endif
3497
3498   default:
3499     barf("unblockThread");
3500   }
3501
3502  done:
3503   tso->link = END_TSO_QUEUE;
3504   tso->why_blocked = NotBlocked;
3505   tso->block_info.closure = NULL;
3506   appendToRunQueue(cap,tso);
3507 }
3508 #endif
3509
3510 /* -----------------------------------------------------------------------------
3511  * checkBlackHoles()
3512  *
3513  * Check the blackhole_queue for threads that can be woken up.  We do
3514  * this periodically: before every GC, and whenever the run queue is
3515  * empty.
3516  *
3517  * An elegant solution might be to just wake up all the blocked
3518  * threads with awakenBlockedQueue occasionally: they'll go back to
3519  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3520  * doesn't give us a way to tell whether we've actually managed to
3521  * wake up any threads, so we would be busy-waiting.
3522  *
3523  * -------------------------------------------------------------------------- */
3524
3525 static rtsBool
3526 checkBlackHoles (Capability *cap)
3527 {
3528     StgTSO **prev, *t;
3529     rtsBool any_woke_up = rtsFalse;
3530     StgHalfWord type;
3531
3532     // blackhole_queue is global:
3533     ASSERT_LOCK_HELD(&sched_mutex);
3534
3535     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3536
3537     // ASSUMES: sched_mutex
3538     prev = &blackhole_queue;
3539     t = blackhole_queue;
3540     while (t != END_TSO_QUEUE) {
3541         ASSERT(t->why_blocked == BlockedOnBlackHole);
3542         type = get_itbl(t->block_info.closure)->type;
3543         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3544             IF_DEBUG(sanity,checkTSO(t));
3545             t = unblockOne(cap, t);
3546             // urk, the threads migrate to the current capability
3547             // here, but we'd like to keep them on the original one.
3548             *prev = t;
3549             any_woke_up = rtsTrue;
3550         } else {
3551             prev = &t->link;
3552             t = t->link;
3553         }
3554     }
3555
3556     return any_woke_up;
3557 }
3558
3559 /* -----------------------------------------------------------------------------
3560  * raiseAsync()
3561  *
3562  * The following function implements the magic for raising an
3563  * asynchronous exception in an existing thread.
3564  *
3565  * We first remove the thread from any queue on which it might be
3566  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3567  *
3568  * We strip the stack down to the innermost CATCH_FRAME, building
3569  * thunks in the heap for all the active computations, so they can 
3570  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3571  * an application of the handler to the exception, and push it on
3572  * the top of the stack.
3573  * 
3574  * How exactly do we save all the active computations?  We create an
3575  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3576  * AP_STACKs pushes everything from the corresponding update frame
3577  * upwards onto the stack.  (Actually, it pushes everything up to the
3578  * next update frame plus a pointer to the next AP_STACK object.
3579  * Entering the next AP_STACK object pushes more onto the stack until we
3580  * reach the last AP_STACK object - at which point the stack should look
3581  * exactly as it did when we killed the TSO and we can continue
3582  * execution by entering the closure on top of the stack.
3583  *
3584  * We can also kill a thread entirely - this happens if either (a) the 
3585  * exception passed to raiseAsync is NULL, or (b) there's no
3586  * CATCH_FRAME on the stack.  In either case, we strip the entire
3587  * stack and replace the thread with a zombie.
3588  *
3589  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3590  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3591  * the TSO is currently blocked on or on the run queue of.
3592  *
3593  * -------------------------------------------------------------------------- */
3594  
3595 void
3596 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3597 {
3598     raiseAsync_(cap, tso, exception, rtsFalse);
3599 }
3600
3601 static void
3602 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3603             rtsBool stop_at_atomically)
3604 {
3605     StgRetInfoTable *info;
3606     StgPtr sp;
3607   
3608     // Thread already dead?
3609     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3610         return;
3611     }
3612
3613     IF_DEBUG(scheduler, 
3614              sched_belch("raising exception in thread %ld.", (long)tso->id));
3615     
3616     // Remove it from any blocking queues
3617     unblockThread(cap,tso);
3618
3619     sp = tso->sp;
3620     
3621     // The stack freezing code assumes there's a closure pointer on
3622     // the top of the stack, so we have to arrange that this is the case...
3623     //
3624     if (sp[0] == (W_)&stg_enter_info) {
3625         sp++;
3626     } else {
3627         sp--;
3628         sp[0] = (W_)&stg_dummy_ret_closure;
3629     }
3630
3631     while (1) {
3632         nat i;
3633
3634         // 1. Let the top of the stack be the "current closure"
3635         //
3636         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3637         // CATCH_FRAME.
3638         //
3639         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3640         // current closure applied to the chunk of stack up to (but not
3641         // including) the update frame.  This closure becomes the "current
3642         // closure".  Go back to step 2.
3643         //
3644         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3645         // top of the stack applied to the exception.
3646         // 
3647         // 5. If it's a STOP_FRAME, then kill the thread.
3648         // 
3649         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3650         // transaction
3651        
3652         
3653         StgPtr frame;
3654         
3655         frame = sp + 1;
3656         info = get_ret_itbl((StgClosure *)frame);
3657         
3658         while (info->i.type != UPDATE_FRAME
3659                && (info->i.type != CATCH_FRAME || exception == NULL)
3660                && info->i.type != STOP_FRAME
3661                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3662         {
3663             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3664               // IF we find an ATOMICALLY_FRAME then we abort the
3665               // current transaction and propagate the exception.  In
3666               // this case (unlike ordinary exceptions) we do not care
3667               // whether the transaction is valid or not because its
3668               // possible validity cannot have caused the exception
3669               // and will not be visible after the abort.
3670               IF_DEBUG(stm,
3671                        debugBelch("Found atomically block delivering async exception\n"));
3672               stmAbortTransaction(tso -> trec);
3673               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3674             }
3675             frame += stack_frame_sizeW((StgClosure *)frame);
3676             info = get_ret_itbl((StgClosure *)frame);
3677         }
3678         
3679         switch (info->i.type) {
3680             
3681         case ATOMICALLY_FRAME:
3682             ASSERT(stop_at_atomically);
3683             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3684             stmCondemnTransaction(tso -> trec);
3685 #ifdef REG_R1
3686             tso->sp = frame;
3687 #else
3688             // R1 is not a register: the return convention for IO in
3689             // this case puts the return value on the stack, so we
3690             // need to set up the stack to return to the atomically
3691             // frame properly...
3692             tso->sp = frame - 2;
3693             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3694             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3695 #endif
3696             tso->what_next = ThreadRunGHC;
3697             return;
3698
3699         case CATCH_FRAME:
3700             // If we find a CATCH_FRAME, and we've got an exception to raise,
3701             // then build the THUNK raise(exception), and leave it on
3702             // top of the CATCH_FRAME ready to enter.
3703             //
3704         {
3705 #ifdef PROFILING
3706             StgCatchFrame *cf = (StgCatchFrame *)frame;
3707 #endif
3708             StgThunk *raise;
3709             
3710             // we've got an exception to raise, so let's pass it to the
3711             // handler in this frame.
3712             //
3713             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3714             TICK_ALLOC_SE_THK(1,0);
3715             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3716             raise->payload[0] = exception;
3717             
3718             // throw away the stack from Sp up to the CATCH_FRAME.
3719             //
3720             sp = frame - 1;
3721             
3722             /* Ensure that async excpetions are blocked now, so we don't get
3723              * a surprise exception before we get around to executing the
3724              * handler.
3725              */
3726             if (tso->blocked_exceptions == NULL) {
3727                 tso->blocked_exceptions = END_TSO_QUEUE;
3728             }
3729             
3730             /* Put the newly-built THUNK on top of the stack, ready to execute
3731              * when the thread restarts.
3732              */
3733             sp[0] = (W_)raise;
3734             sp[-1] = (W_)&stg_enter_info;
3735             tso->sp = sp-1;
3736             tso->what_next = ThreadRunGHC;
3737             IF_DEBUG(sanity, checkTSO(tso));
3738             return;
3739         }
3740         
3741         case UPDATE_FRAME:
3742         {
3743             StgAP_STACK * ap;
3744             nat words;
3745             
3746             // First build an AP_STACK consisting of the stack chunk above the
3747             // current update frame, with the top word on the stack as the
3748             // fun field.
3749             //
3750             words = frame - sp - 1;
3751             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3752             
3753             ap->size = words;
3754             ap->fun  = (StgClosure *)sp[0];
3755             sp++;
3756             for(i=0; i < (nat)words; ++i) {
3757                 ap->payload[i] = (StgClosure *)*sp++;
3758             }
3759             
3760             SET_HDR(ap,&stg_AP_STACK_info,
3761                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3762             TICK_ALLOC_UP_THK(words+1,0);
3763             
3764             IF_DEBUG(scheduler,
3765                      debugBelch("sched: Updating ");
3766                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3767                      debugBelch(" with ");
3768                      printObj((StgClosure *)ap);
3769                 );
3770
3771             // Replace the updatee with an indirection - happily
3772             // this will also wake up any threads currently
3773             // waiting on the result.
3774             //
3775             // Warning: if we're in a loop, more than one update frame on
3776             // the stack may point to the same object.  Be careful not to
3777             // overwrite an IND_OLDGEN in this case, because we'll screw
3778             // up the mutable lists.  To be on the safe side, don't
3779             // overwrite any kind of indirection at all.  See also
3780             // threadSqueezeStack in GC.c, where we have to make a similar
3781             // check.
3782             //
3783             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3784                 // revert the black hole
3785                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3786                                (StgClosure *)ap);
3787             }
3788             sp += sizeofW(StgUpdateFrame) - 1;
3789             sp[0] = (W_)ap; // push onto stack
3790             break;
3791         }
3792         
3793         case STOP_FRAME:
3794             // We've stripped the entire stack, the thread is now dead.
3795             sp += sizeofW(StgStopFrame);
3796             tso->what_next = ThreadKilled;
3797             tso->sp = sp;
3798             return;
3799             
3800         default:
3801             barf("raiseAsync");
3802         }
3803     }
3804     barf("raiseAsync");
3805 }
3806
3807 /* -----------------------------------------------------------------------------
3808    Deleting threads
3809
3810    This is used for interruption (^C) and forking, and corresponds to
3811    raising an exception but without letting the thread catch the
3812    exception.
3813    -------------------------------------------------------------------------- */
3814
3815 static void 
3816 deleteThread (Capability *cap, StgTSO *tso)
3817 {
3818   if (tso->why_blocked != BlockedOnCCall &&
3819       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3820       raiseAsync(cap,tso,NULL);
3821   }
3822 }
3823
3824 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3825 static void 
3826 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3827 { // for forkProcess only:
3828   // delete thread without giving it a chance to catch the KillThread exception
3829
3830   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3831       return;
3832   }
3833
3834   if (tso->why_blocked != BlockedOnCCall &&
3835       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3836       unblockThread(cap,tso);
3837   }
3838
3839   tso->what_next = ThreadKilled;
3840 }
3841 #endif
3842
3843 /* -----------------------------------------------------------------------------
3844    raiseExceptionHelper
3845    
3846    This function is called by the raise# primitve, just so that we can
3847    move some of the tricky bits of raising an exception from C-- into
3848    C.  Who knows, it might be a useful re-useable thing here too.
3849    -------------------------------------------------------------------------- */
3850
3851 StgWord
3852 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3853 {
3854     Capability *cap = regTableToCapability(reg);
3855     StgThunk *raise_closure = NULL;
3856     StgPtr p, next;
3857     StgRetInfoTable *info;
3858     //
3859     // This closure represents the expression 'raise# E' where E
3860     // is the exception raise.  It is used to overwrite all the
3861     // thunks which are currently under evaluataion.
3862     //
3863
3864     //    
3865     // LDV profiling: stg_raise_info has THUNK as its closure
3866     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3867     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3868     // 1 does not cause any problem unless profiling is performed.
3869     // However, when LDV profiling goes on, we need to linearly scan
3870     // small object pool, where raise_closure is stored, so we should
3871     // use MIN_UPD_SIZE.
3872     //
3873     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3874     //                                 sizeofW(StgClosure)+1);
3875     //
3876
3877     //
3878     // Walk up the stack, looking for the catch frame.  On the way,
3879     // we update any closures pointed to from update frames with the
3880     // raise closure that we just built.
3881     //
3882     p = tso->sp;
3883     while(1) {
3884         info = get_ret_itbl((StgClosure *)p);
3885         next = p + stack_frame_sizeW((StgClosure *)p);
3886         switch (info->i.type) {
3887             
3888         case UPDATE_FRAME:
3889             // Only create raise_closure if we need to.
3890             if (raise_closure == NULL) {
3891                 raise_closure = 
3892                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3893                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3894                 raise_closure->payload[0] = exception;
3895             }
3896             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3897             p = next;
3898             continue;
3899
3900         case ATOMICALLY_FRAME:
3901             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3902             tso->sp = p;
3903             return ATOMICALLY_FRAME;
3904             
3905         case CATCH_FRAME:
3906             tso->sp = p;
3907             return CATCH_FRAME;
3908
3909         case CATCH_STM_FRAME:
3910             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3911             tso->sp = p;
3912             return CATCH_STM_FRAME;
3913             
3914         case STOP_FRAME:
3915             tso->sp = p;
3916             return STOP_FRAME;
3917
3918         case CATCH_RETRY_FRAME:
3919         default:
3920             p = next; 
3921             continue;
3922         }
3923     }
3924 }
3925
3926
3927 /* -----------------------------------------------------------------------------
3928    findRetryFrameHelper
3929
3930    This function is called by the retry# primitive.  It traverses the stack
3931    leaving tso->sp referring to the frame which should handle the retry.  
3932
3933    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3934    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3935
3936    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3937    despite the similar implementation.
3938
3939    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3940    not be created within memory transactions.
3941    -------------------------------------------------------------------------- */
3942
3943 StgWord
3944 findRetryFrameHelper (StgTSO *tso)
3945 {
3946   StgPtr           p, next;
3947   StgRetInfoTable *info;
3948
3949   p = tso -> sp;
3950   while (1) {
3951     info = get_ret_itbl((StgClosure *)p);
3952     next = p + stack_frame_sizeW((StgClosure *)p);
3953     switch (info->i.type) {
3954       
3955     case ATOMICALLY_FRAME:
3956       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3957       tso->sp = p;
3958       return ATOMICALLY_FRAME;
3959       
3960     case CATCH_RETRY_FRAME:
3961       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3962       tso->sp = p;
3963       return CATCH_RETRY_FRAME;
3964       
3965     case CATCH_STM_FRAME:
3966     default:
3967       ASSERT(info->i.type != CATCH_FRAME);
3968       ASSERT(info->i.type != STOP_FRAME);
3969       p = next; 
3970       continue;
3971     }
3972   }
3973 }
3974
3975 /* -----------------------------------------------------------------------------
3976    resurrectThreads is called after garbage collection on the list of
3977    threads found to be garbage.  Each of these threads will be woken
3978    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3979    on an MVar, or NonTermination if the thread was blocked on a Black
3980    Hole.
3981
3982    Locks: assumes we hold *all* the capabilities.
3983    -------------------------------------------------------------------------- */
3984
3985 void
3986 resurrectThreads (StgTSO *threads)
3987 {
3988     StgTSO *tso, *next;
3989     Capability *cap;
3990
3991     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3992         next = tso->global_link;
3993         tso->global_link = all_threads;
3994         all_threads = tso;
3995         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3996         
3997         // Wake up the thread on the Capability it was last on for a
3998         // bound thread, or last_free_capability otherwise.
3999         if (tso->bound) {
4000             cap = tso->bound->cap;
4001         } else {
4002             cap = last_free_capability;
4003         }
4004         
4005         switch (tso->why_blocked) {
4006         case BlockedOnMVar:
4007         case BlockedOnException:
4008             /* Called by GC - sched_mutex lock is currently held. */
4009             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4010             break;
4011         case BlockedOnBlackHole:
4012             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4013             break;
4014         case BlockedOnSTM:
4015             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4016             break;
4017         case NotBlocked:
4018             /* This might happen if the thread was blocked on a black hole
4019              * belonging to a thread that we've just woken up (raiseAsync
4020              * can wake up threads, remember...).
4021              */
4022             continue;
4023         default:
4024             barf("resurrectThreads: thread blocked in a strange way");
4025         }
4026     }
4027 }
4028
4029 /* ----------------------------------------------------------------------------
4030  * Debugging: why is a thread blocked
4031  * [Also provides useful information when debugging threaded programs
4032  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4033    ------------------------------------------------------------------------- */
4034
4035 #if DEBUG
4036 static void
4037 printThreadBlockage(StgTSO *tso)
4038 {
4039   switch (tso->why_blocked) {
4040   case BlockedOnRead:
4041     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4042     break;
4043   case BlockedOnWrite:
4044     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4045     break;
4046 #if defined(mingw32_HOST_OS)
4047     case BlockedOnDoProc:
4048     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4049     break;
4050 #endif
4051   case BlockedOnDelay:
4052     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4053     break;
4054   case BlockedOnMVar:
4055     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4056     break;
4057   case BlockedOnException:
4058     debugBelch("is blocked on delivering an exception to thread %d",
4059             tso->block_info.tso->id);
4060     break;
4061   case BlockedOnBlackHole:
4062     debugBelch("is blocked on a black hole");
4063     break;
4064   case NotBlocked:
4065     debugBelch("is not blocked");
4066     break;
4067 #if defined(PARALLEL_HASKELL)
4068   case BlockedOnGA:
4069     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4070             tso->block_info.closure, info_type(tso->block_info.closure));
4071     break;
4072   case BlockedOnGA_NoSend:
4073     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4074             tso->block_info.closure, info_type(tso->block_info.closure));
4075     break;
4076 #endif
4077   case BlockedOnCCall:
4078     debugBelch("is blocked on an external call");
4079     break;
4080   case BlockedOnCCall_NoUnblockExc:
4081     debugBelch("is blocked on an external call (exceptions were already blocked)");
4082     break;
4083   case BlockedOnSTM:
4084     debugBelch("is blocked on an STM operation");
4085     break;
4086   default:
4087     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4088          tso->why_blocked, tso->id, tso);
4089   }
4090 }
4091
4092 void
4093 printThreadStatus(StgTSO *t)
4094 {
4095     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4096     {
4097       void *label = lookupThreadLabel(t->id);
4098       if (label) debugBelch("[\"%s\"] ",(char *)label);
4099     }
4100     if (t->what_next == ThreadRelocated) {
4101         debugBelch("has been relocated...\n");
4102     } else {
4103         switch (t->what_next) {
4104         case ThreadKilled:
4105             debugBelch("has been killed");
4106             break;
4107         case ThreadComplete:
4108             debugBelch("has completed");
4109             break;
4110         default:
4111             printThreadBlockage(t);
4112         }
4113         debugBelch("\n");
4114     }
4115 }
4116
4117 void
4118 printAllThreads(void)
4119 {
4120   StgTSO *t, *next;
4121   nat i;
4122   Capability *cap;
4123
4124 # if defined(GRAN)
4125   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4126   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4127                        time_string, rtsFalse/*no commas!*/);
4128
4129   debugBelch("all threads at [%s]:\n", time_string);
4130 # elif defined(PARALLEL_HASKELL)
4131   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4132   ullong_format_string(CURRENT_TIME,
4133                        time_string, rtsFalse/*no commas!*/);
4134
4135   debugBelch("all threads at [%s]:\n", time_string);
4136 # else
4137   debugBelch("all threads:\n");
4138 # endif
4139
4140   for (i = 0; i < n_capabilities; i++) {
4141       cap = &capabilities[i];
4142       debugBelch("threads on capability %d:\n", cap->no);
4143       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4144           printThreadStatus(t);
4145       }
4146   }
4147
4148   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4149       if (t->why_blocked != NotBlocked) {
4150           printThreadStatus(t);
4151       }
4152       if (t->what_next == ThreadRelocated) {
4153           next = t->link;
4154       } else {
4155           next = t->global_link;
4156       }
4157   }
4158 }
4159
4160 // useful from gdb
4161 void 
4162 printThreadQueue(StgTSO *t)
4163 {
4164     nat i = 0;
4165     for (; t != END_TSO_QUEUE; t = t->link) {
4166         printThreadStatus(t);
4167         i++;
4168     }
4169     debugBelch("%d threads on queue\n", i);
4170 }
4171
4172 /* 
4173    Print a whole blocking queue attached to node (debugging only).
4174 */
4175 # if defined(PARALLEL_HASKELL)
4176 void 
4177 print_bq (StgClosure *node)
4178 {
4179   StgBlockingQueueElement *bqe;
4180   StgTSO *tso;
4181   rtsBool end;
4182
4183   debugBelch("## BQ of closure %p (%s): ",
4184           node, info_type(node));
4185
4186   /* should cover all closures that may have a blocking queue */
4187   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4188          get_itbl(node)->type == FETCH_ME_BQ ||
4189          get_itbl(node)->type == RBH ||
4190          get_itbl(node)->type == MVAR);
4191     
4192   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4193
4194   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4195 }
4196
4197 /* 
4198    Print a whole blocking queue starting with the element bqe.
4199 */
4200 void 
4201 print_bqe (StgBlockingQueueElement *bqe)
4202 {
4203   rtsBool end;
4204
4205   /* 
4206      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4207   */
4208   for (end = (bqe==END_BQ_QUEUE);
4209        !end; // iterate until bqe points to a CONSTR
4210        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4211        bqe = end ? END_BQ_QUEUE : bqe->link) {
4212     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4213     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4214     /* types of closures that may appear in a blocking queue */
4215     ASSERT(get_itbl(bqe)->type == TSO ||           
4216            get_itbl(bqe)->type == BLOCKED_FETCH || 
4217            get_itbl(bqe)->type == CONSTR); 
4218     /* only BQs of an RBH end with an RBH_Save closure */
4219     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4220
4221     switch (get_itbl(bqe)->type) {
4222     case TSO:
4223       debugBelch(" TSO %u (%x),",
4224               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4225       break;
4226     case BLOCKED_FETCH:
4227       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4228               ((StgBlockedFetch *)bqe)->node, 
4229               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4230               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4231               ((StgBlockedFetch *)bqe)->ga.weight);
4232       break;
4233     case CONSTR:
4234       debugBelch(" %s (IP %p),",
4235               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4236                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4237                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4238                "RBH_Save_?"), get_itbl(bqe));
4239       break;
4240     default:
4241       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4242            info_type((StgClosure *)bqe)); // , node, info_type(node));
4243       break;
4244     }
4245   } /* for */
4246   debugBelch("\n");
4247 }
4248 # elif defined(GRAN)
4249 void 
4250 print_bq (StgClosure *node)
4251 {
4252   StgBlockingQueueElement *bqe;
4253   PEs node_loc, tso_loc;
4254   rtsBool end;
4255
4256   /* should cover all closures that may have a blocking queue */
4257   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4258          get_itbl(node)->type == FETCH_ME_BQ ||
4259          get_itbl(node)->type == RBH);
4260     
4261   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4262   node_loc = where_is(node);
4263
4264   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4265           node, info_type(node), node_loc);
4266
4267   /* 
4268      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4269   */
4270   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4271        !end; // iterate until bqe points to a CONSTR
4272        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4273     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4274     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4275     /* types of closures that may appear in a blocking queue */
4276     ASSERT(get_itbl(bqe)->type == TSO ||           
4277            get_itbl(bqe)->type == CONSTR); 
4278     /* only BQs of an RBH end with an RBH_Save closure */
4279     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4280
4281     tso_loc = where_is((StgClosure *)bqe);
4282     switch (get_itbl(bqe)->type) {
4283     case TSO:
4284       debugBelch(" TSO %d (%p) on [PE %d],",
4285               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4286       break;
4287     case CONSTR:
4288       debugBelch(" %s (IP %p),",
4289               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4290                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4291                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4292                "RBH_Save_?"), get_itbl(bqe));
4293       break;
4294     default:
4295       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4296            info_type((StgClosure *)bqe), node, info_type(node));
4297       break;
4298     }
4299   } /* for */
4300   debugBelch("\n");
4301 }
4302 # endif
4303
4304 #if defined(PARALLEL_HASKELL)
4305 static nat
4306 run_queue_len(void)
4307 {
4308     nat i;
4309     StgTSO *tso;
4310     
4311     for (i=0, tso=run_queue_hd; 
4312          tso != END_TSO_QUEUE;
4313          i++, tso=tso->link) {
4314         /* nothing */
4315     }
4316         
4317     return i;
4318 }
4319 #endif
4320
4321 void
4322 sched_belch(char *s, ...)
4323 {
4324     va_list ap;
4325     va_start(ap,s);
4326 #ifdef THREADED_RTS
4327     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4328 #elif defined(PARALLEL_HASKELL)
4329     debugBelch("== ");
4330 #else
4331     debugBelch("sched: ");
4332 #endif
4333     vdebugBelch(s, ap);
4334     debugBelch("\n");
4335     va_end(ap);
4336 }
4337
4338 #endif /* DEBUG */