check black holes before doing GC in scheduleDoHeapProfile()
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
231                          void (*get_roots)(evac_fn));
232
233 static void unblockThread(Capability *cap, StgTSO *tso);
234 static rtsBool checkBlackHoles(Capability *cap);
235 static void AllRoots(evac_fn evac);
236
237 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238
239 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
240                         rtsBool stop_at_atomically, StgPtr stop_here);
241
242 static void deleteThread (Capability *cap, StgTSO *tso);
243 static void deleteRunQueue (Capability *cap);
244
245 #ifdef DEBUG
246 static void printThreadBlockage(StgTSO *tso);
247 static void printThreadStatus(StgTSO *tso);
248 void printThreadQueue(StgTSO *tso);
249 #endif
250
251 #if defined(PARALLEL_HASKELL)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 #ifdef DEBUG
257 static char *whatNext_strs[] = {
258   "(unknown)",
259   "ThreadRunGHC",
260   "ThreadInterpret",
261   "ThreadKilled",
262   "ThreadRelocated",
263   "ThreadComplete"
264 };
265 #endif
266
267 /* -----------------------------------------------------------------------------
268  * Putting a thread on the run queue: different scheduling policies
269  * -------------------------------------------------------------------------- */
270
271 STATIC_INLINE void
272 addToRunQueue( Capability *cap, StgTSO *t )
273 {
274 #if defined(PARALLEL_HASKELL)
275     if (RtsFlags.ParFlags.doFairScheduling) { 
276         // this does round-robin scheduling; good for concurrency
277         appendToRunQueue(cap,t);
278     } else {
279         // this does unfair scheduling; good for parallelism
280         pushOnRunQueue(cap,t);
281     }
282 #else
283     // this does round-robin scheduling; good for concurrency
284     appendToRunQueue(cap,t);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    GRAN version:
301      In a GranSim setup this loop iterates over the global event queue.
302      This revolves around the global event queue, which determines what 
303      to do next. Therefore, it's more complicated than either the 
304      concurrent or the parallel (GUM) setup.
305
306    GUM version:
307      GUM iterates over incoming messages.
308      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
309      and sends out a fish whenever it has nothing to do; in-between
310      doing the actual reductions (shared code below) it processes the
311      incoming messages and deals with delayed operations 
312      (see PendingFetches).
313      This is not the ugliest code you could imagine, but it's bloody close.
314
315    ------------------------------------------------------------------------ */
316
317 static Capability *
318 schedule (Capability *initialCapability, Task *task)
319 {
320   StgTSO *t;
321   Capability *cap;
322   StgThreadReturnCode ret;
323 #if defined(GRAN)
324   rtsEvent *event;
325 #elif defined(PARALLEL_HASKELL)
326   StgTSO *tso;
327   GlobalTaskId pe;
328   rtsBool receivedFinish = rtsFalse;
329 # if defined(DEBUG)
330   nat tp_size, sp_size; // stats only
331 # endif
332 #endif
333   nat prev_what_next;
334   rtsBool ready_to_gc;
335 #if defined(THREADED_RTS)
336   rtsBool first = rtsTrue;
337 #endif
338   
339   cap = initialCapability;
340
341   // Pre-condition: this task owns initialCapability.
342   // The sched_mutex is *NOT* held
343   // NB. on return, we still hold a capability.
344
345   IF_DEBUG(scheduler,
346            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
347                        task, initialCapability);
348       );
349
350   schedulePreLoop();
351
352   // -----------------------------------------------------------
353   // Scheduler loop starts here:
354
355 #if defined(PARALLEL_HASKELL)
356 #define TERMINATION_CONDITION        (!receivedFinish)
357 #elif defined(GRAN)
358 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
359 #else
360 #define TERMINATION_CONDITION        rtsTrue
361 #endif
362
363   while (TERMINATION_CONDITION) {
364
365 #if defined(GRAN)
366       /* Choose the processor with the next event */
367       CurrentProc = event->proc;
368       CurrentTSO = event->tso;
369 #endif
370
371 #if defined(THREADED_RTS)
372       if (first) {
373           // don't yield the first time, we want a chance to run this
374           // thread for a bit, even if there are others banging at the
375           // door.
376           first = rtsFalse;
377           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378       } else {
379           // Yield the capability to higher-priority tasks if necessary.
380           yieldCapability(&cap, task);
381       }
382 #endif
383       
384 #if defined(THREADED_RTS)
385       schedulePushWork(cap,task);
386 #endif
387
388     // Check whether we have re-entered the RTS from Haskell without
389     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390     // call).
391     if (cap->in_haskell) {
392           errorBelch("schedule: re-entered unsafely.\n"
393                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
394           stg_exit(EXIT_FAILURE);
395     }
396
397     //
398     // Test for interruption.  If interrupted==rtsTrue, then either
399     // we received a keyboard interrupt (^C), or the scheduler is
400     // trying to shut down all the tasks (shutting_down_scheduler) in
401     // the threaded RTS.
402     //
403     if (interrupted) {
404         deleteRunQueue(cap);
405 #if defined(THREADED_RTS)
406         discardSparksCap(cap);
407 #endif
408         if (shutting_down_scheduler) {
409             IF_DEBUG(scheduler, sched_belch("shutting down"));
410             // If we are a worker, just exit.  If we're a bound thread
411             // then we will exit below when we've removed our TSO from
412             // the run queue.
413             if (task->tso == NULL && emptyRunQueue(cap)) {
414                 return cap;
415             }
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(THREADED_RTS)
422     // If the run queue is empty, take a spark and turn it into a thread.
423     {
424         if (emptyRunQueue(cap)) {
425             StgClosure *spark;
426             spark = findSpark(cap);
427             if (spark != NULL) {
428                 IF_DEBUG(scheduler,
429                          sched_belch("turning spark of closure %p into a thread",
430                                      (StgClosure *)spark));
431                 createSparkThread(cap,spark);     
432             }
433         }
434     }
435 #endif // THREADED_RTS
436
437     scheduleStartSignalHandlers(cap);
438
439     // Only check the black holes here if we've nothing else to do.
440     // During normal execution, the black hole list only gets checked
441     // at GC time, to avoid repeatedly traversing this possibly long
442     // list each time around the scheduler.
443     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444
445     scheduleCheckBlockedThreads(cap);
446
447     scheduleDetectDeadlock(cap,task);
448 #if defined(THREADED_RTS)
449     cap = task->cap;    // reload cap, it might have changed
450 #endif
451
452     // Normally, the only way we can get here with no threads to
453     // run is if a keyboard interrupt received during 
454     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
455     // Additionally, it is not fatal for the
456     // threaded RTS to reach here with no threads to run.
457     //
458     // win32: might be here due to awaitEvent() being abandoned
459     // as a result of a console event having been delivered.
460     if ( emptyRunQueue(cap) ) {
461 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
462         ASSERT(interrupted);
463 #endif
464         continue; // nothing to do
465     }
466
467 #if defined(PARALLEL_HASKELL)
468     scheduleSendPendingMessages();
469     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
470         continue;
471
472 #if defined(SPARKS)
473     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
474 #endif
475
476     /* If we still have no work we need to send a FISH to get a spark
477        from another PE */
478     if (emptyRunQueue(cap)) {
479         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
480         ASSERT(rtsFalse); // should not happen at the moment
481     }
482     // from here: non-empty run queue.
483     //  TODO: merge above case with this, only one call processMessages() !
484     if (PacketsWaiting()) {  /* process incoming messages, if
485                                 any pending...  only in else
486                                 because getRemoteWork waits for
487                                 messages as well */
488         receivedFinish = processMessages();
489     }
490 #endif
491
492 #if defined(GRAN)
493     scheduleProcessEvent(event);
494 #endif
495
496     // 
497     // Get a thread to run
498     //
499     t = popRunQueue(cap);
500
501 #if defined(GRAN) || defined(PAR)
502     scheduleGranParReport(); // some kind of debuging output
503 #else
504     // Sanity check the thread we're about to run.  This can be
505     // expensive if there is lots of thread switching going on...
506     IF_DEBUG(sanity,checkTSO(t));
507 #endif
508
509 #if defined(THREADED_RTS)
510     // Check whether we can run this thread in the current task.
511     // If not, we have to pass our capability to the right task.
512     {
513         Task *bound = t->bound;
514       
515         if (bound) {
516             if (bound == task) {
517                 IF_DEBUG(scheduler,
518                          sched_belch("### Running thread %d in bound thread",
519                                      t->id));
520                 // yes, the Haskell thread is bound to the current native thread
521             } else {
522                 IF_DEBUG(scheduler,
523                          sched_belch("### thread %d bound to another OS thread",
524                                      t->id));
525                 // no, bound to a different Haskell thread: pass to that thread
526                 pushOnRunQueue(cap,t);
527                 continue;
528             }
529         } else {
530             // The thread we want to run is unbound.
531             if (task->tso) { 
532                 IF_DEBUG(scheduler,
533                          sched_belch("### this OS thread cannot run thread %d", t->id));
534                 // no, the current native thread is bound to a different
535                 // Haskell thread, so pass it to any worker thread
536                 pushOnRunQueue(cap,t);
537                 continue; 
538             }
539         }
540     }
541 #endif
542
543     cap->r.rCurrentTSO = t;
544     
545     /* context switches are initiated by the timer signal, unless
546      * the user specified "context switch as often as possible", with
547      * +RTS -C0
548      */
549     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
550         && !emptyThreadQueues(cap)) {
551         context_switch = 1;
552     }
553          
554 run_thread:
555
556     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
557                               (long)t->id, whatNext_strs[t->what_next]));
558
559 #if defined(PROFILING)
560     startHeapProfTimer();
561 #endif
562
563     // ----------------------------------------------------------------------
564     // Run the current thread 
565
566     prev_what_next = t->what_next;
567
568     errno = t->saved_errno;
569     cap->in_haskell = rtsTrue;
570
571     dirtyTSO(t);
572
573     recent_activity = ACTIVITY_YES;
574
575     switch (prev_what_next) {
576         
577     case ThreadKilled:
578     case ThreadComplete:
579         /* Thread already finished, return to scheduler. */
580         ret = ThreadFinished;
581         break;
582         
583     case ThreadRunGHC:
584     {
585         StgRegTable *r;
586         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
587         cap = regTableToCapability(r);
588         ret = r->rRet;
589         break;
590     }
591     
592     case ThreadInterpret:
593         cap = interpretBCO(cap);
594         ret = cap->r.rRet;
595         break;
596         
597     default:
598         barf("schedule: invalid what_next field");
599     }
600
601     cap->in_haskell = rtsFalse;
602
603     // The TSO might have moved, eg. if it re-entered the RTS and a GC
604     // happened.  So find the new location:
605     t = cap->r.rCurrentTSO;
606
607     // We have run some Haskell code: there might be blackhole-blocked
608     // threads to wake up now.
609     // Lock-free test here should be ok, we're just setting a flag.
610     if ( blackhole_queue != END_TSO_QUEUE ) {
611         blackholes_need_checking = rtsTrue;
612     }
613     
614     // And save the current errno in this thread.
615     // XXX: possibly bogus for SMP because this thread might already
616     // be running again, see code below.
617     t->saved_errno = errno;
618
619 #ifdef SMP
620     // If ret is ThreadBlocked, and this Task is bound to the TSO that
621     // blocked, we are in limbo - the TSO is now owned by whatever it
622     // is blocked on, and may in fact already have been woken up,
623     // perhaps even on a different Capability.  It may be the case
624     // that task->cap != cap.  We better yield this Capability
625     // immediately and return to normaility.
626     if (ret == ThreadBlocked) {
627         IF_DEBUG(scheduler,
628                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
629                              t->id, whatNext_strs[t->what_next]));
630         continue;
631     }
632 #endif
633
634     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
635
636     // ----------------------------------------------------------------------
637     
638     // Costs for the scheduler are assigned to CCS_SYSTEM
639 #if defined(PROFILING)
640     stopHeapProfTimer();
641     CCCS = CCS_SYSTEM;
642 #endif
643     
644 #if defined(THREADED_RTS)
645     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
646 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
647     IF_DEBUG(scheduler,debugBelch("sched: "););
648 #endif
649     
650     schedulePostRunThread();
651
652     ready_to_gc = rtsFalse;
653
654     switch (ret) {
655     case HeapOverflow:
656         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
657         break;
658
659     case StackOverflow:
660         scheduleHandleStackOverflow(cap,task,t);
661         break;
662
663     case ThreadYielding:
664         if (scheduleHandleYield(cap, t, prev_what_next)) {
665             // shortcut for switching between compiler/interpreter:
666             goto run_thread; 
667         }
668         break;
669
670     case ThreadBlocked:
671         scheduleHandleThreadBlocked(t);
672         break;
673
674     case ThreadFinished:
675         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
676         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
677         break;
678
679     default:
680       barf("schedule: invalid thread return code %d", (int)ret);
681     }
682
683     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
684     if (ready_to_gc) {
685       scheduleDoGC(cap,task,rtsFalse,GetRoots);
686 #if defined(THREADED_RTS)
687       cap = task->cap;    // reload cap, it might have changed  
688 #endif
689     }
690   } /* end of while() */
691
692   IF_PAR_DEBUG(verbose,
693                debugBelch("== Leaving schedule() after having received Finish\n"));
694 }
695
696 /* ----------------------------------------------------------------------------
697  * Setting up the scheduler loop
698  * ------------------------------------------------------------------------- */
699
700 static void
701 schedulePreLoop(void)
702 {
703 #if defined(GRAN) 
704     /* set up first event to get things going */
705     /* ToDo: assign costs for system setup and init MainTSO ! */
706     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
707               ContinueThread, 
708               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
709     
710     IF_DEBUG(gran,
711              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
712                         CurrentTSO);
713              G_TSO(CurrentTSO, 5));
714     
715     if (RtsFlags.GranFlags.Light) {
716         /* Save current time; GranSim Light only */
717         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
718     }      
719 #endif
720 }
721
722 /* -----------------------------------------------------------------------------
723  * schedulePushWork()
724  *
725  * Push work to other Capabilities if we have some.
726  * -------------------------------------------------------------------------- */
727
728 #if defined(THREADED_RTS)
729 static void
730 schedulePushWork(Capability *cap USED_IF_THREADS, 
731                  Task *task      USED_IF_THREADS)
732 {
733     Capability *free_caps[n_capabilities], *cap0;
734     nat i, n_free_caps;
735
736     // Check whether we have more threads on our run queue, or sparks
737     // in our pool, that we could hand to another Capability.
738     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
739         && sparkPoolSizeCap(cap) < 2) {
740         return;
741     }
742
743     // First grab as many free Capabilities as we can.
744     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
745         cap0 = &capabilities[i];
746         if (cap != cap0 && tryGrabCapability(cap0,task)) {
747             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
748                 // it already has some work, we just grabbed it at 
749                 // the wrong moment.  Or maybe it's deadlocked!
750                 releaseCapability(cap0);
751             } else {
752                 free_caps[n_free_caps++] = cap0;
753             }
754         }
755     }
756
757     // we now have n_free_caps free capabilities stashed in
758     // free_caps[].  Share our run queue equally with them.  This is
759     // probably the simplest thing we could do; improvements we might
760     // want to do include:
761     //
762     //   - giving high priority to moving relatively new threads, on 
763     //     the gournds that they haven't had time to build up a
764     //     working set in the cache on this CPU/Capability.
765     //
766     //   - giving low priority to moving long-lived threads
767
768     if (n_free_caps > 0) {
769         StgTSO *prev, *t, *next;
770         rtsBool pushed_to_all;
771
772         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
773
774         i = 0;
775         pushed_to_all = rtsFalse;
776
777         if (cap->run_queue_hd != END_TSO_QUEUE) {
778             prev = cap->run_queue_hd;
779             t = prev->link;
780             prev->link = END_TSO_QUEUE;
781             for (; t != END_TSO_QUEUE; t = next) {
782                 next = t->link;
783                 t->link = END_TSO_QUEUE;
784                 if (t->what_next == ThreadRelocated
785                     || t->bound == task) { // don't move my bound thread
786                     prev->link = t;
787                     prev = t;
788                 } else if (i == n_free_caps) {
789                     pushed_to_all = rtsTrue;
790                     i = 0;
791                     // keep one for us
792                     prev->link = t;
793                     prev = t;
794                 } else {
795                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
796                     appendToRunQueue(free_caps[i],t);
797                     if (t->bound) { t->bound->cap = free_caps[i]; }
798                     i++;
799                 }
800             }
801             cap->run_queue_tl = prev;
802         }
803
804         // If there are some free capabilities that we didn't push any
805         // threads to, then try to push a spark to each one.
806         if (!pushed_to_all) {
807             StgClosure *spark;
808             // i is the next free capability to push to
809             for (; i < n_free_caps; i++) {
810                 if (emptySparkPoolCap(free_caps[i])) {
811                     spark = findSpark(cap);
812                     if (spark != NULL) {
813                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
814                         newSpark(&(free_caps[i]->r), spark);
815                     }
816                 }
817             }
818         }
819
820         // release the capabilities
821         for (i = 0; i < n_free_caps; i++) {
822             task->cap = free_caps[i];
823             releaseCapability(free_caps[i]);
824         }
825     }
826     task->cap = cap; // reset to point to our Capability.
827 }
828 #endif
829
830 /* ----------------------------------------------------------------------------
831  * Start any pending signal handlers
832  * ------------------------------------------------------------------------- */
833
834 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
835 static void
836 scheduleStartSignalHandlers(Capability *cap)
837 {
838     if (signals_pending()) { // safe outside the lock
839         startSignalHandlers(cap);
840     }
841 }
842 #else
843 static void
844 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
845 {
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Check for blocked threads that can be woken up.
851  * ------------------------------------------------------------------------- */
852
853 static void
854 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
855 {
856 #if !defined(THREADED_RTS)
857     //
858     // Check whether any waiting threads need to be woken up.  If the
859     // run queue is empty, and there are no other tasks running, we
860     // can wait indefinitely for something to happen.
861     //
862     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
863     {
864         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
865     }
866 #endif
867 }
868
869
870 /* ----------------------------------------------------------------------------
871  * Check for threads blocked on BLACKHOLEs that can be woken up
872  * ------------------------------------------------------------------------- */
873 static void
874 scheduleCheckBlackHoles (Capability *cap)
875 {
876     if ( blackholes_need_checking ) // check without the lock first
877     {
878         ACQUIRE_LOCK(&sched_mutex);
879         if ( blackholes_need_checking ) {
880             checkBlackHoles(cap);
881             blackholes_need_checking = rtsFalse;
882         }
883         RELEASE_LOCK(&sched_mutex);
884     }
885 }
886
887 /* ----------------------------------------------------------------------------
888  * Detect deadlock conditions and attempt to resolve them.
889  * ------------------------------------------------------------------------- */
890
891 static void
892 scheduleDetectDeadlock (Capability *cap, Task *task)
893 {
894
895 #if defined(PARALLEL_HASKELL)
896     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
897     return;
898 #endif
899
900     /* 
901      * Detect deadlock: when we have no threads to run, there are no
902      * threads blocked, waiting for I/O, or sleeping, and all the
903      * other tasks are waiting for work, we must have a deadlock of
904      * some description.
905      */
906     if ( emptyThreadQueues(cap) )
907     {
908 #if defined(THREADED_RTS)
909         /* 
910          * In the threaded RTS, we only check for deadlock if there
911          * has been no activity in a complete timeslice.  This means
912          * we won't eagerly start a full GC just because we don't have
913          * any threads to run currently.
914          */
915         if (recent_activity != ACTIVITY_INACTIVE) return;
916 #endif
917
918         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
919
920         // Garbage collection can release some new threads due to
921         // either (a) finalizers or (b) threads resurrected because
922         // they are unreachable and will therefore be sent an
923         // exception.  Any threads thus released will be immediately
924         // runnable.
925         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
926 #if defined(THREADED_RTS)
927         cap = task->cap;    // reload cap, it might have changed
928 #endif
929
930         recent_activity = ACTIVITY_DONE_GC;
931         
932         if ( !emptyRunQueue(cap) ) return;
933
934 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
935         /* If we have user-installed signal handlers, then wait
936          * for signals to arrive rather then bombing out with a
937          * deadlock.
938          */
939         if ( anyUserHandlers() ) {
940             IF_DEBUG(scheduler, 
941                      sched_belch("still deadlocked, waiting for signals..."));
942
943             awaitUserSignals();
944
945             if (signals_pending()) {
946                 startSignalHandlers(cap);
947             }
948
949             // either we have threads to run, or we were interrupted:
950             ASSERT(!emptyRunQueue(cap) || interrupted);
951         }
952 #endif
953
954 #if !defined(THREADED_RTS)
955         /* Probably a real deadlock.  Send the current main thread the
956          * Deadlock exception.
957          */
958         if (task->tso) {
959             switch (task->tso->why_blocked) {
960             case BlockedOnSTM:
961             case BlockedOnBlackHole:
962             case BlockedOnException:
963             case BlockedOnMVar:
964                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
965                 return;
966             default:
967                 barf("deadlock: main thread blocked in a strange way");
968             }
969         }
970         return;
971 #endif
972     }
973 }
974
975 /* ----------------------------------------------------------------------------
976  * Process an event (GRAN only)
977  * ------------------------------------------------------------------------- */
978
979 #if defined(GRAN)
980 static StgTSO *
981 scheduleProcessEvent(rtsEvent *event)
982 {
983     StgTSO *t;
984
985     if (RtsFlags.GranFlags.Light)
986       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
987
988     /* adjust time based on time-stamp */
989     if (event->time > CurrentTime[CurrentProc] &&
990         event->evttype != ContinueThread)
991       CurrentTime[CurrentProc] = event->time;
992     
993     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
994     if (!RtsFlags.GranFlags.Light)
995       handleIdlePEs();
996
997     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
998
999     /* main event dispatcher in GranSim */
1000     switch (event->evttype) {
1001       /* Should just be continuing execution */
1002     case ContinueThread:
1003       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1004       /* ToDo: check assertion
1005       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1006              run_queue_hd != END_TSO_QUEUE);
1007       */
1008       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1009       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1010           procStatus[CurrentProc]==Fetching) {
1011         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1012               CurrentTSO->id, CurrentTSO, CurrentProc);
1013         goto next_thread;
1014       } 
1015       /* Ignore ContinueThreads for completed threads */
1016       if (CurrentTSO->what_next == ThreadComplete) {
1017         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1018               CurrentTSO->id, CurrentTSO, CurrentProc);
1019         goto next_thread;
1020       } 
1021       /* Ignore ContinueThreads for threads that are being migrated */
1022       if (PROCS(CurrentTSO)==Nowhere) { 
1023         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1024               CurrentTSO->id, CurrentTSO, CurrentProc);
1025         goto next_thread;
1026       }
1027       /* The thread should be at the beginning of the run queue */
1028       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1029         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1030               CurrentTSO->id, CurrentTSO, CurrentProc);
1031         break; // run the thread anyway
1032       }
1033       /*
1034       new_event(proc, proc, CurrentTime[proc],
1035                 FindWork,
1036                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1037       goto next_thread; 
1038       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1039       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1040
1041     case FetchNode:
1042       do_the_fetchnode(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case GlobalBlock:
1046       do_the_globalblock(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case FetchReply:
1050       do_the_fetchreply(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     case UnblockThread:   /* Move from the blocked queue to the tail of */
1054       do_the_unblock(event);
1055       goto next_thread;             /* handle next event in event queue  */
1056       
1057     case ResumeThread:  /* Move from the blocked queue to the tail of */
1058       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1059       event->tso->gran.blocktime += 
1060         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1061       do_the_startthread(event);
1062       goto next_thread;             /* handle next event in event queue  */
1063       
1064     case StartThread:
1065       do_the_startthread(event);
1066       goto next_thread;             /* handle next event in event queue  */
1067       
1068     case MoveThread:
1069       do_the_movethread(event);
1070       goto next_thread;             /* handle next event in event queue  */
1071       
1072     case MoveSpark:
1073       do_the_movespark(event);
1074       goto next_thread;             /* handle next event in event queue  */
1075       
1076     case FindWork:
1077       do_the_findwork(event);
1078       goto next_thread;             /* handle next event in event queue  */
1079       
1080     default:
1081       barf("Illegal event type %u\n", event->evttype);
1082     }  /* switch */
1083     
1084     /* This point was scheduler_loop in the old RTS */
1085
1086     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1087
1088     TimeOfLastEvent = CurrentTime[CurrentProc];
1089     TimeOfNextEvent = get_time_of_next_event();
1090     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1091     // CurrentTSO = ThreadQueueHd;
1092
1093     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1094                          TimeOfNextEvent));
1095
1096     if (RtsFlags.GranFlags.Light) 
1097       GranSimLight_leave_system(event, &ActiveTSO); 
1098
1099     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1100
1101     IF_DEBUG(gran, 
1102              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1103
1104     /* in a GranSim setup the TSO stays on the run queue */
1105     t = CurrentTSO;
1106     /* Take a thread from the run queue. */
1107     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1108
1109     IF_DEBUG(gran, 
1110              debugBelch("GRAN: About to run current thread, which is\n");
1111              G_TSO(t,5));
1112
1113     context_switch = 0; // turned on via GranYield, checking events and time slice
1114
1115     IF_DEBUG(gran, 
1116              DumpGranEvent(GR_SCHEDULE, t));
1117
1118     procStatus[CurrentProc] = Busy;
1119 }
1120 #endif // GRAN
1121
1122 /* ----------------------------------------------------------------------------
1123  * Send pending messages (PARALLEL_HASKELL only)
1124  * ------------------------------------------------------------------------- */
1125
1126 #if defined(PARALLEL_HASKELL)
1127 static StgTSO *
1128 scheduleSendPendingMessages(void)
1129 {
1130     StgSparkPool *pool;
1131     rtsSpark spark;
1132     StgTSO *t;
1133
1134 # if defined(PAR) // global Mem.Mgmt., omit for now
1135     if (PendingFetches != END_BF_QUEUE) {
1136         processFetches();
1137     }
1138 # endif
1139     
1140     if (RtsFlags.ParFlags.BufferTime) {
1141         // if we use message buffering, we must send away all message
1142         // packets which have become too old...
1143         sendOldBuffers(); 
1144     }
1145 }
1146 #endif
1147
1148 /* ----------------------------------------------------------------------------
1149  * Activate spark threads (PARALLEL_HASKELL only)
1150  * ------------------------------------------------------------------------- */
1151
1152 #if defined(PARALLEL_HASKELL)
1153 static void
1154 scheduleActivateSpark(void)
1155 {
1156 #if defined(SPARKS)
1157   ASSERT(emptyRunQueue());
1158 /* We get here if the run queue is empty and want some work.
1159    We try to turn a spark into a thread, and add it to the run queue,
1160    from where it will be picked up in the next iteration of the scheduler
1161    loop.
1162 */
1163
1164       /* :-[  no local threads => look out for local sparks */
1165       /* the spark pool for the current PE */
1166       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1167       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1168           pool->hd < pool->tl) {
1169         /* 
1170          * ToDo: add GC code check that we really have enough heap afterwards!!
1171          * Old comment:
1172          * If we're here (no runnable threads) and we have pending
1173          * sparks, we must have a space problem.  Get enough space
1174          * to turn one of those pending sparks into a
1175          * thread... 
1176          */
1177
1178         spark = findSpark(rtsFalse);            /* get a spark */
1179         if (spark != (rtsSpark) NULL) {
1180           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1181           IF_PAR_DEBUG(fish, // schedule,
1182                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1183                              tso->id, tso, advisory_thread_count));
1184
1185           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1186             IF_PAR_DEBUG(fish, // schedule,
1187                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1188                             spark));
1189             return rtsFalse; /* failed to generate a thread */
1190           }                  /* otherwise fall through & pick-up new tso */
1191         } else {
1192           IF_PAR_DEBUG(fish, // schedule,
1193                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1194                              spark_queue_len(pool)));
1195           return rtsFalse;  /* failed to generate a thread */
1196         }
1197         return rtsTrue;  /* success in generating a thread */
1198   } else { /* no more threads permitted or pool empty */
1199     return rtsFalse;  /* failed to generateThread */
1200   }
1201 #else
1202   tso = NULL; // avoid compiler warning only
1203   return rtsFalse;  /* dummy in non-PAR setup */
1204 #endif // SPARKS
1205 }
1206 #endif // PARALLEL_HASKELL
1207
1208 /* ----------------------------------------------------------------------------
1209  * Get work from a remote node (PARALLEL_HASKELL only)
1210  * ------------------------------------------------------------------------- */
1211     
1212 #if defined(PARALLEL_HASKELL)
1213 static rtsBool
1214 scheduleGetRemoteWork(rtsBool *receivedFinish)
1215 {
1216   ASSERT(emptyRunQueue());
1217
1218   if (RtsFlags.ParFlags.BufferTime) {
1219         IF_PAR_DEBUG(verbose, 
1220                 debugBelch("...send all pending data,"));
1221         {
1222           nat i;
1223           for (i=1; i<=nPEs; i++)
1224             sendImmediately(i); // send all messages away immediately
1225         }
1226   }
1227 # ifndef SPARKS
1228         //++EDEN++ idle() , i.e. send all buffers, wait for work
1229         // suppress fishing in EDEN... just look for incoming messages
1230         // (blocking receive)
1231   IF_PAR_DEBUG(verbose, 
1232                debugBelch("...wait for incoming messages...\n"));
1233   *receivedFinish = processMessages(); // blocking receive...
1234
1235         // and reenter scheduling loop after having received something
1236         // (return rtsFalse below)
1237
1238 # else /* activate SPARKS machinery */
1239 /* We get here, if we have no work, tried to activate a local spark, but still
1240    have no work. We try to get a remote spark, by sending a FISH message.
1241    Thread migration should be added here, and triggered when a sequence of 
1242    fishes returns without work. */
1243         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1244
1245       /* =8-[  no local sparks => look for work on other PEs */
1246         /*
1247          * We really have absolutely no work.  Send out a fish
1248          * (there may be some out there already), and wait for
1249          * something to arrive.  We clearly can't run any threads
1250          * until a SCHEDULE or RESUME arrives, and so that's what
1251          * we're hoping to see.  (Of course, we still have to
1252          * respond to other types of messages.)
1253          */
1254         rtsTime now = msTime() /*CURRENT_TIME*/;
1255         IF_PAR_DEBUG(verbose, 
1256                      debugBelch("--  now=%ld\n", now));
1257         IF_PAR_DEBUG(fish, // verbose,
1258              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1259                  (last_fish_arrived_at!=0 &&
1260                   last_fish_arrived_at+delay > now)) {
1261                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1262                      now, last_fish_arrived_at+delay, 
1263                      last_fish_arrived_at,
1264                      delay);
1265              });
1266   
1267         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1268             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1269           if (last_fish_arrived_at==0 ||
1270               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1271             /* outstandingFishes is set in sendFish, processFish;
1272                avoid flooding system with fishes via delay */
1273     next_fish_to_send_at = 0;  
1274   } else {
1275     /* ToDo: this should be done in the main scheduling loop to avoid the
1276              busy wait here; not so bad if fish delay is very small  */
1277     int iq = 0; // DEBUGGING -- HWL
1278     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1279     /* send a fish when ready, but process messages that arrive in the meantime */
1280     do {
1281       if (PacketsWaiting()) {
1282         iq++; // DEBUGGING
1283         *receivedFinish = processMessages();
1284       }
1285       now = msTime();
1286     } while (!*receivedFinish || now<next_fish_to_send_at);
1287     // JB: This means the fish could become obsolete, if we receive
1288     // work. Better check for work again? 
1289     // last line: while (!receivedFinish || !haveWork || now<...)
1290     // next line: if (receivedFinish || haveWork )
1291
1292     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1293       return rtsFalse;  // NB: this will leave scheduler loop
1294                         // immediately after return!
1295                           
1296     IF_PAR_DEBUG(fish, // verbose,
1297                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1298
1299   }
1300
1301     // JB: IMHO, this should all be hidden inside sendFish(...)
1302     /* pe = choosePE(); 
1303        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1304                 NEW_FISH_HUNGER);
1305
1306     // Global statistics: count no. of fishes
1307     if (RtsFlags.ParFlags.ParStats.Global &&
1308          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1309            globalParStats.tot_fish_mess++;
1310            }
1311     */ 
1312
1313   /* delayed fishes must have been sent by now! */
1314   next_fish_to_send_at = 0;  
1315   }
1316       
1317   *receivedFinish = processMessages();
1318 # endif /* SPARKS */
1319
1320  return rtsFalse;
1321  /* NB: this function always returns rtsFalse, meaning the scheduler
1322     loop continues with the next iteration; 
1323     rationale: 
1324       return code means success in finding work; we enter this function
1325       if there is no local work, thus have to send a fish which takes
1326       time until it arrives with work; in the meantime we should process
1327       messages in the main loop;
1328  */
1329 }
1330 #endif // PARALLEL_HASKELL
1331
1332 /* ----------------------------------------------------------------------------
1333  * PAR/GRAN: Report stats & debugging info(?)
1334  * ------------------------------------------------------------------------- */
1335
1336 #if defined(PAR) || defined(GRAN)
1337 static void
1338 scheduleGranParReport(void)
1339 {
1340   ASSERT(run_queue_hd != END_TSO_QUEUE);
1341
1342   /* Take a thread from the run queue, if we have work */
1343   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1344
1345     /* If this TSO has got its outport closed in the meantime, 
1346      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1347      * It has to be marked as TH_DEAD for this purpose.
1348      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1349
1350 JB: TODO: investigate wether state change field could be nuked
1351      entirely and replaced by the normal tso state (whatnext
1352      field). All we want to do is to kill tsos from outside.
1353      */
1354
1355     /* ToDo: write something to the log-file
1356     if (RTSflags.ParFlags.granSimStats && !sameThread)
1357         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1358
1359     CurrentTSO = t;
1360     */
1361     /* the spark pool for the current PE */
1362     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1363
1364     IF_DEBUG(scheduler, 
1365              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1366                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1367
1368     IF_PAR_DEBUG(fish,
1369              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1370                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1371
1372     if (RtsFlags.ParFlags.ParStats.Full && 
1373         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1374         (emitSchedule || // forced emit
1375          (t && LastTSO && t->id != LastTSO->id))) {
1376       /* 
1377          we are running a different TSO, so write a schedule event to log file
1378          NB: If we use fair scheduling we also have to write  a deschedule 
1379              event for LastTSO; with unfair scheduling we know that the
1380              previous tso has blocked whenever we switch to another tso, so
1381              we don't need it in GUM for now
1382       */
1383       IF_PAR_DEBUG(fish, // schedule,
1384                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1385
1386       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1387                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1388       emitSchedule = rtsFalse;
1389     }
1390 }     
1391 #endif
1392
1393 /* ----------------------------------------------------------------------------
1394  * After running a thread...
1395  * ------------------------------------------------------------------------- */
1396
1397 static void
1398 schedulePostRunThread(void)
1399 {
1400 #if defined(PAR)
1401     /* HACK 675: if the last thread didn't yield, make sure to print a 
1402        SCHEDULE event to the log file when StgRunning the next thread, even
1403        if it is the same one as before */
1404     LastTSO = t; 
1405     TimeOfLastYield = CURRENT_TIME;
1406 #endif
1407
1408   /* some statistics gathering in the parallel case */
1409
1410 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1411   switch (ret) {
1412     case HeapOverflow:
1413 # if defined(GRAN)
1414       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1415       globalGranStats.tot_heapover++;
1416 # elif defined(PAR)
1417       globalParStats.tot_heapover++;
1418 # endif
1419       break;
1420
1421      case StackOverflow:
1422 # if defined(GRAN)
1423       IF_DEBUG(gran, 
1424                DumpGranEvent(GR_DESCHEDULE, t));
1425       globalGranStats.tot_stackover++;
1426 # elif defined(PAR)
1427       // IF_DEBUG(par, 
1428       // DumpGranEvent(GR_DESCHEDULE, t);
1429       globalParStats.tot_stackover++;
1430 # endif
1431       break;
1432
1433     case ThreadYielding:
1434 # if defined(GRAN)
1435       IF_DEBUG(gran, 
1436                DumpGranEvent(GR_DESCHEDULE, t));
1437       globalGranStats.tot_yields++;
1438 # elif defined(PAR)
1439       // IF_DEBUG(par, 
1440       // DumpGranEvent(GR_DESCHEDULE, t);
1441       globalParStats.tot_yields++;
1442 # endif
1443       break; 
1444
1445     case ThreadBlocked:
1446 # if defined(GRAN)
1447       IF_DEBUG(scheduler,
1448                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1449                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1450                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1451                if (t->block_info.closure!=(StgClosure*)NULL)
1452                  print_bq(t->block_info.closure);
1453                debugBelch("\n"));
1454
1455       // ??? needed; should emit block before
1456       IF_DEBUG(gran, 
1457                DumpGranEvent(GR_DESCHEDULE, t)); 
1458       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1459       /*
1460         ngoq Dogh!
1461       ASSERT(procStatus[CurrentProc]==Busy || 
1462               ((procStatus[CurrentProc]==Fetching) && 
1463               (t->block_info.closure!=(StgClosure*)NULL)));
1464       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1465           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1466             procStatus[CurrentProc]==Fetching)) 
1467         procStatus[CurrentProc] = Idle;
1468       */
1469 # elif defined(PAR)
1470 //++PAR++  blockThread() writes the event (change?)
1471 # endif
1472     break;
1473
1474   case ThreadFinished:
1475     break;
1476
1477   default:
1478     barf("parGlobalStats: unknown return code");
1479     break;
1480     }
1481 #endif
1482 }
1483
1484 /* -----------------------------------------------------------------------------
1485  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1486  * -------------------------------------------------------------------------- */
1487
1488 static rtsBool
1489 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1490 {
1491     // did the task ask for a large block?
1492     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1493         // if so, get one and push it on the front of the nursery.
1494         bdescr *bd;
1495         lnat blocks;
1496         
1497         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1498         
1499         IF_DEBUG(scheduler,
1500                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1501                             (long)t->id, whatNext_strs[t->what_next], blocks));
1502         
1503         // don't do this if the nursery is (nearly) full, we'll GC first.
1504         if (cap->r.rCurrentNursery->link != NULL ||
1505             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1506                                                // if the nursery has only one block.
1507             
1508             ACQUIRE_SM_LOCK
1509             bd = allocGroup( blocks );
1510             RELEASE_SM_LOCK
1511             cap->r.rNursery->n_blocks += blocks;
1512             
1513             // link the new group into the list
1514             bd->link = cap->r.rCurrentNursery;
1515             bd->u.back = cap->r.rCurrentNursery->u.back;
1516             if (cap->r.rCurrentNursery->u.back != NULL) {
1517                 cap->r.rCurrentNursery->u.back->link = bd;
1518             } else {
1519 #if !defined(THREADED_RTS)
1520                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1521                        g0s0 == cap->r.rNursery);
1522 #endif
1523                 cap->r.rNursery->blocks = bd;
1524             }             
1525             cap->r.rCurrentNursery->u.back = bd;
1526             
1527             // initialise it as a nursery block.  We initialise the
1528             // step, gen_no, and flags field of *every* sub-block in
1529             // this large block, because this is easier than making
1530             // sure that we always find the block head of a large
1531             // block whenever we call Bdescr() (eg. evacuate() and
1532             // isAlive() in the GC would both have to do this, at
1533             // least).
1534             { 
1535                 bdescr *x;
1536                 for (x = bd; x < bd + blocks; x++) {
1537                     x->step = cap->r.rNursery;
1538                     x->gen_no = 0;
1539                     x->flags = 0;
1540                 }
1541             }
1542             
1543             // This assert can be a killer if the app is doing lots
1544             // of large block allocations.
1545             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1546             
1547             // now update the nursery to point to the new block
1548             cap->r.rCurrentNursery = bd;
1549             
1550             // we might be unlucky and have another thread get on the
1551             // run queue before us and steal the large block, but in that
1552             // case the thread will just end up requesting another large
1553             // block.
1554             pushOnRunQueue(cap,t);
1555             return rtsFalse;  /* not actually GC'ing */
1556         }
1557     }
1558     
1559     IF_DEBUG(scheduler,
1560              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1561                         (long)t->id, whatNext_strs[t->what_next]));
1562 #if defined(GRAN)
1563     ASSERT(!is_on_queue(t,CurrentProc));
1564 #elif defined(PARALLEL_HASKELL)
1565     /* Currently we emit a DESCHEDULE event before GC in GUM.
1566        ToDo: either add separate event to distinguish SYSTEM time from rest
1567        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1568     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1569         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1570                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1571         emitSchedule = rtsTrue;
1572     }
1573 #endif
1574       
1575     pushOnRunQueue(cap,t);
1576     return rtsTrue;
1577     /* actual GC is done at the end of the while loop in schedule() */
1578 }
1579
1580 /* -----------------------------------------------------------------------------
1581  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1582  * -------------------------------------------------------------------------- */
1583
1584 static void
1585 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1586 {
1587     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1588                                   (long)t->id, whatNext_strs[t->what_next]));
1589     /* just adjust the stack for this thread, then pop it back
1590      * on the run queue.
1591      */
1592     { 
1593         /* enlarge the stack */
1594         StgTSO *new_t = threadStackOverflow(cap, t);
1595         
1596         /* The TSO attached to this Task may have moved, so update the
1597          * pointer to it.
1598          */
1599         if (task->tso == t) {
1600             task->tso = new_t;
1601         }
1602         pushOnRunQueue(cap,new_t);
1603     }
1604 }
1605
1606 /* -----------------------------------------------------------------------------
1607  * Handle a thread that returned to the scheduler with ThreadYielding
1608  * -------------------------------------------------------------------------- */
1609
1610 static rtsBool
1611 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1612 {
1613     // Reset the context switch flag.  We don't do this just before
1614     // running the thread, because that would mean we would lose ticks
1615     // during GC, which can lead to unfair scheduling (a thread hogs
1616     // the CPU because the tick always arrives during GC).  This way
1617     // penalises threads that do a lot of allocation, but that seems
1618     // better than the alternative.
1619     context_switch = 0;
1620     
1621     /* put the thread back on the run queue.  Then, if we're ready to
1622      * GC, check whether this is the last task to stop.  If so, wake
1623      * up the GC thread.  getThread will block during a GC until the
1624      * GC is finished.
1625      */
1626     IF_DEBUG(scheduler,
1627              if (t->what_next != prev_what_next) {
1628                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1629                             (long)t->id, whatNext_strs[t->what_next]);
1630              } else {
1631                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1632                             (long)t->id, whatNext_strs[t->what_next]);
1633              }
1634         );
1635     
1636     IF_DEBUG(sanity,
1637              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1638              checkTSO(t));
1639     ASSERT(t->link == END_TSO_QUEUE);
1640     
1641     // Shortcut if we're just switching evaluators: don't bother
1642     // doing stack squeezing (which can be expensive), just run the
1643     // thread.
1644     if (t->what_next != prev_what_next) {
1645         return rtsTrue;
1646     }
1647     
1648 #if defined(GRAN)
1649     ASSERT(!is_on_queue(t,CurrentProc));
1650       
1651     IF_DEBUG(sanity,
1652              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1653              checkThreadQsSanity(rtsTrue));
1654
1655 #endif
1656
1657     addToRunQueue(cap,t);
1658
1659 #if defined(GRAN)
1660     /* add a ContinueThread event to actually process the thread */
1661     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1662               ContinueThread,
1663               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1664     IF_GRAN_DEBUG(bq, 
1665                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1666                   G_EVENTQ(0);
1667                   G_CURR_THREADQ(0));
1668 #endif
1669     return rtsFalse;
1670 }
1671
1672 /* -----------------------------------------------------------------------------
1673  * Handle a thread that returned to the scheduler with ThreadBlocked
1674  * -------------------------------------------------------------------------- */
1675
1676 static void
1677 scheduleHandleThreadBlocked( StgTSO *t
1678 #if !defined(GRAN) && !defined(DEBUG)
1679     STG_UNUSED
1680 #endif
1681     )
1682 {
1683 #if defined(GRAN)
1684     IF_DEBUG(scheduler,
1685              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1686                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1687              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1688     
1689     // ??? needed; should emit block before
1690     IF_DEBUG(gran, 
1691              DumpGranEvent(GR_DESCHEDULE, t)); 
1692     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1693     /*
1694       ngoq Dogh!
1695       ASSERT(procStatus[CurrentProc]==Busy || 
1696       ((procStatus[CurrentProc]==Fetching) && 
1697       (t->block_info.closure!=(StgClosure*)NULL)));
1698       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1699       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1700       procStatus[CurrentProc]==Fetching)) 
1701       procStatus[CurrentProc] = Idle;
1702     */
1703 #elif defined(PAR)
1704     IF_DEBUG(scheduler,
1705              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1706                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1707     IF_PAR_DEBUG(bq,
1708                  
1709                  if (t->block_info.closure!=(StgClosure*)NULL) 
1710                  print_bq(t->block_info.closure));
1711     
1712     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1713     blockThread(t);
1714     
1715     /* whatever we schedule next, we must log that schedule */
1716     emitSchedule = rtsTrue;
1717     
1718 #else /* !GRAN */
1719
1720       // We don't need to do anything.  The thread is blocked, and it
1721       // has tidied up its stack and placed itself on whatever queue
1722       // it needs to be on.
1723
1724 #if !defined(THREADED_RTS)
1725     ASSERT(t->why_blocked != NotBlocked);
1726              // This might not be true under THREADED_RTS: we don't have
1727              // exclusive access to this TSO, so someone might have
1728              // woken it up by now.  This actually happens: try
1729              // conc023 +RTS -N2.
1730 #endif
1731
1732     IF_DEBUG(scheduler,
1733              debugBelch("--<< thread %d (%s) stopped: ", 
1734                         t->id, whatNext_strs[t->what_next]);
1735              printThreadBlockage(t);
1736              debugBelch("\n"));
1737     
1738     /* Only for dumping event to log file 
1739        ToDo: do I need this in GranSim, too?
1740        blockThread(t);
1741     */
1742 #endif
1743 }
1744
1745 /* -----------------------------------------------------------------------------
1746  * Handle a thread that returned to the scheduler with ThreadFinished
1747  * -------------------------------------------------------------------------- */
1748
1749 static rtsBool
1750 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1751 {
1752     /* Need to check whether this was a main thread, and if so,
1753      * return with the return value.
1754      *
1755      * We also end up here if the thread kills itself with an
1756      * uncaught exception, see Exception.cmm.
1757      */
1758     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1759                                   t->id, whatNext_strs[t->what_next]));
1760
1761 #if defined(GRAN)
1762       endThread(t, CurrentProc); // clean-up the thread
1763 #elif defined(PARALLEL_HASKELL)
1764       /* For now all are advisory -- HWL */
1765       //if(t->priority==AdvisoryPriority) ??
1766       advisory_thread_count--; // JB: Caution with this counter, buggy!
1767       
1768 # if defined(DIST)
1769       if(t->dist.priority==RevalPriority)
1770         FinishReval(t);
1771 # endif
1772     
1773 # if defined(EDENOLD)
1774       // the thread could still have an outport... (BUG)
1775       if (t->eden.outport != -1) {
1776       // delete the outport for the tso which has finished...
1777         IF_PAR_DEBUG(eden_ports,
1778                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1779                               t->eden.outport, t->id));
1780         deleteOPT(t);
1781       }
1782       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1783       if (t->eden.epid != -1) {
1784         IF_PAR_DEBUG(eden_ports,
1785                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1786                            t->id, t->eden.epid));
1787         removeTSOfromProcess(t);
1788       }
1789 # endif 
1790
1791 # if defined(PAR)
1792       if (RtsFlags.ParFlags.ParStats.Full &&
1793           !RtsFlags.ParFlags.ParStats.Suppressed) 
1794         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1795
1796       //  t->par only contains statistics: left out for now...
1797       IF_PAR_DEBUG(fish,
1798                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1799                               t->id,t,t->par.sparkname));
1800 # endif
1801 #endif // PARALLEL_HASKELL
1802
1803       //
1804       // Check whether the thread that just completed was a bound
1805       // thread, and if so return with the result.  
1806       //
1807       // There is an assumption here that all thread completion goes
1808       // through this point; we need to make sure that if a thread
1809       // ends up in the ThreadKilled state, that it stays on the run
1810       // queue so it can be dealt with here.
1811       //
1812
1813       if (t->bound) {
1814
1815           if (t->bound != task) {
1816 #if !defined(THREADED_RTS)
1817               // Must be a bound thread that is not the topmost one.  Leave
1818               // it on the run queue until the stack has unwound to the
1819               // point where we can deal with this.  Leaving it on the run
1820               // queue also ensures that the garbage collector knows about
1821               // this thread and its return value (it gets dropped from the
1822               // all_threads list so there's no other way to find it).
1823               appendToRunQueue(cap,t);
1824               return rtsFalse;
1825 #else
1826               // this cannot happen in the threaded RTS, because a
1827               // bound thread can only be run by the appropriate Task.
1828               barf("finished bound thread that isn't mine");
1829 #endif
1830           }
1831
1832           ASSERT(task->tso == t);
1833
1834           if (t->what_next == ThreadComplete) {
1835               if (task->ret) {
1836                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1837                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1838               }
1839               task->stat = Success;
1840           } else {
1841               if (task->ret) {
1842                   *(task->ret) = NULL;
1843               }
1844               if (interrupted) {
1845                   task->stat = Interrupted;
1846               } else {
1847                   task->stat = Killed;
1848               }
1849           }
1850 #ifdef DEBUG
1851           removeThreadLabel((StgWord)task->tso->id);
1852 #endif
1853           return rtsTrue; // tells schedule() to return
1854       }
1855
1856       return rtsFalse;
1857 }
1858
1859 /* -----------------------------------------------------------------------------
1860  * Perform a heap census, if PROFILING
1861  * -------------------------------------------------------------------------- */
1862
1863 static rtsBool
1864 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 {
1866 #if defined(PROFILING)
1867     // When we have +RTS -i0 and we're heap profiling, do a census at
1868     // every GC.  This lets us get repeatable runs for debugging.
1869     if (performHeapProfile ||
1870         (RtsFlags.ProfFlags.profileInterval==0 &&
1871          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1872
1873         // checking black holes is necessary before GC, otherwise
1874         // there may be threads that are unreachable except by the
1875         // blackhole queue, which the GC will consider to be
1876         // deadlocked.
1877         scheduleCheckBlackHoles(&MainCapability);
1878
1879         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1880         GarbageCollect(GetRoots, rtsTrue);
1881
1882         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1883         heapCensus();
1884
1885         performHeapProfile = rtsFalse;
1886         return rtsTrue;  // true <=> we already GC'd
1887     }
1888 #endif
1889     return rtsFalse;
1890 }
1891
1892 /* -----------------------------------------------------------------------------
1893  * Perform a garbage collection if necessary
1894  * -------------------------------------------------------------------------- */
1895
1896 static void
1897 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1898               rtsBool force_major, void (*get_roots)(evac_fn))
1899 {
1900     StgTSO *t;
1901 #ifdef THREADED_RTS
1902     static volatile StgWord waiting_for_gc;
1903     rtsBool was_waiting;
1904     nat i;
1905 #endif
1906
1907 #ifdef THREADED_RTS
1908     // In order to GC, there must be no threads running Haskell code.
1909     // Therefore, the GC thread needs to hold *all* the capabilities,
1910     // and release them after the GC has completed.  
1911     //
1912     // This seems to be the simplest way: previous attempts involved
1913     // making all the threads with capabilities give up their
1914     // capabilities and sleep except for the *last* one, which
1915     // actually did the GC.  But it's quite hard to arrange for all
1916     // the other tasks to sleep and stay asleep.
1917     //
1918         
1919     was_waiting = cas(&waiting_for_gc, 0, 1);
1920     if (was_waiting) {
1921         do {
1922             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1923             if (cap) yieldCapability(&cap,task);
1924         } while (waiting_for_gc);
1925         return;  // NOTE: task->cap might have changed here
1926     }
1927
1928     for (i=0; i < n_capabilities; i++) {
1929         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1930         if (cap != &capabilities[i]) {
1931             Capability *pcap = &capabilities[i];
1932             // we better hope this task doesn't get migrated to
1933             // another Capability while we're waiting for this one.
1934             // It won't, because load balancing happens while we have
1935             // all the Capabilities, but even so it's a slightly
1936             // unsavoury invariant.
1937             task->cap = pcap;
1938             context_switch = 1;
1939             waitForReturnCapability(&pcap, task);
1940             if (pcap != &capabilities[i]) {
1941                 barf("scheduleDoGC: got the wrong capability");
1942             }
1943         }
1944     }
1945
1946     waiting_for_gc = rtsFalse;
1947 #endif
1948
1949     /* Kick any transactions which are invalid back to their
1950      * atomically frames.  When next scheduled they will try to
1951      * commit, this commit will fail and they will retry.
1952      */
1953     { 
1954         StgTSO *next;
1955
1956         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1957             if (t->what_next == ThreadRelocated) {
1958                 next = t->link;
1959             } else {
1960                 next = t->global_link;
1961                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1962                     if (!stmValidateNestOfTransactions (t -> trec)) {
1963                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1964                         
1965                         // strip the stack back to the
1966                         // ATOMICALLY_FRAME, aborting the (nested)
1967                         // transaction, and saving the stack of any
1968                         // partially-evaluated thunks on the heap.
1969                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
1970                         
1971 #ifdef REG_R1
1972                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1973 #endif
1974                     }
1975                 }
1976             }
1977         }
1978     }
1979     
1980     // so this happens periodically:
1981     if (cap) scheduleCheckBlackHoles(cap);
1982     
1983     IF_DEBUG(scheduler, printAllThreads());
1984
1985     /* everybody back, start the GC.
1986      * Could do it in this thread, or signal a condition var
1987      * to do it in another thread.  Either way, we need to
1988      * broadcast on gc_pending_cond afterward.
1989      */
1990 #if defined(THREADED_RTS)
1991     IF_DEBUG(scheduler,sched_belch("doing GC"));
1992 #endif
1993     GarbageCollect(get_roots, force_major);
1994     
1995 #if defined(THREADED_RTS)
1996     // release our stash of capabilities.
1997     for (i = 0; i < n_capabilities; i++) {
1998         if (cap != &capabilities[i]) {
1999             task->cap = &capabilities[i];
2000             releaseCapability(&capabilities[i]);
2001         }
2002     }
2003     if (cap) {
2004         task->cap = cap;
2005     } else {
2006         task->cap = NULL;
2007     }
2008 #endif
2009
2010 #if defined(GRAN)
2011     /* add a ContinueThread event to continue execution of current thread */
2012     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2013               ContinueThread,
2014               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2015     IF_GRAN_DEBUG(bq, 
2016                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2017                   G_EVENTQ(0);
2018                   G_CURR_THREADQ(0));
2019 #endif /* GRAN */
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2024  * used by Control.Concurrent for error checking.
2025  * ------------------------------------------------------------------------- */
2026  
2027 StgBool
2028 rtsSupportsBoundThreads(void)
2029 {
2030 #if defined(THREADED_RTS)
2031   return rtsTrue;
2032 #else
2033   return rtsFalse;
2034 #endif
2035 }
2036
2037 /* ---------------------------------------------------------------------------
2038  * isThreadBound(tso): check whether tso is bound to an OS thread.
2039  * ------------------------------------------------------------------------- */
2040  
2041 StgBool
2042 isThreadBound(StgTSO* tso USED_IF_THREADS)
2043 {
2044 #if defined(THREADED_RTS)
2045   return (tso->bound != NULL);
2046 #endif
2047   return rtsFalse;
2048 }
2049
2050 /* ---------------------------------------------------------------------------
2051  * Singleton fork(). Do not copy any running threads.
2052  * ------------------------------------------------------------------------- */
2053
2054 #if !defined(mingw32_HOST_OS)
2055 #define FORKPROCESS_PRIMOP_SUPPORTED
2056 #endif
2057
2058 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2059 static void 
2060 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2061 #endif
2062 StgInt
2063 forkProcess(HsStablePtr *entry
2064 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2065             STG_UNUSED
2066 #endif
2067            )
2068 {
2069 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2070     Task *task;
2071     pid_t pid;
2072     StgTSO* t,*next;
2073     Capability *cap;
2074     
2075 #if defined(THREADED_RTS)
2076     if (RtsFlags.ParFlags.nNodes > 1) {
2077         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2078         stg_exit(EXIT_FAILURE);
2079     }
2080 #endif
2081
2082     IF_DEBUG(scheduler,sched_belch("forking!"));
2083     
2084     // ToDo: for SMP, we should probably acquire *all* the capabilities
2085     cap = rts_lock();
2086     
2087     pid = fork();
2088     
2089     if (pid) { // parent
2090         
2091         // just return the pid
2092         rts_unlock(cap);
2093         return pid;
2094         
2095     } else { // child
2096         
2097         // delete all threads
2098         cap->run_queue_hd = END_TSO_QUEUE;
2099         cap->run_queue_tl = END_TSO_QUEUE;
2100         
2101         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2102             next = t->link;
2103             
2104             // don't allow threads to catch the ThreadKilled exception
2105             deleteThreadImmediately(cap,t);
2106         }
2107         
2108         // wipe the task list
2109         ACQUIRE_LOCK(&sched_mutex);
2110         for (task = all_tasks; task != NULL; task=task->all_link) {
2111             if (task != cap->running_task) discardTask(task);
2112         }
2113         RELEASE_LOCK(&sched_mutex);
2114
2115         cap->suspended_ccalling_tasks = NULL;
2116
2117 #if defined(THREADED_RTS)
2118         // wipe our spare workers list.
2119         cap->spare_workers = NULL;
2120         cap->returning_tasks_hd = NULL;
2121         cap->returning_tasks_tl = NULL;
2122 #endif
2123
2124         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2125         rts_checkSchedStatus("forkProcess",cap);
2126         
2127         rts_unlock(cap);
2128         hs_exit();                      // clean up and exit
2129         stg_exit(EXIT_SUCCESS);
2130     }
2131 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2132     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2133     return -1;
2134 #endif
2135 }
2136
2137 /* ---------------------------------------------------------------------------
2138  * Delete the threads on the run queue of the current capability.
2139  * ------------------------------------------------------------------------- */
2140    
2141 static void
2142 deleteRunQueue (Capability *cap)
2143 {
2144     StgTSO *t, *next;
2145     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2146         ASSERT(t->what_next != ThreadRelocated);
2147         next = t->link;
2148         deleteThread(cap, t);
2149     }
2150 }
2151
2152 /* startThread and  insertThread are now in GranSim.c -- HWL */
2153
2154
2155 /* -----------------------------------------------------------------------------
2156    Managing the suspended_ccalling_tasks list.
2157    Locks required: sched_mutex
2158    -------------------------------------------------------------------------- */
2159
2160 STATIC_INLINE void
2161 suspendTask (Capability *cap, Task *task)
2162 {
2163     ASSERT(task->next == NULL && task->prev == NULL);
2164     task->next = cap->suspended_ccalling_tasks;
2165     task->prev = NULL;
2166     if (cap->suspended_ccalling_tasks) {
2167         cap->suspended_ccalling_tasks->prev = task;
2168     }
2169     cap->suspended_ccalling_tasks = task;
2170 }
2171
2172 STATIC_INLINE void
2173 recoverSuspendedTask (Capability *cap, Task *task)
2174 {
2175     if (task->prev) {
2176         task->prev->next = task->next;
2177     } else {
2178         ASSERT(cap->suspended_ccalling_tasks == task);
2179         cap->suspended_ccalling_tasks = task->next;
2180     }
2181     if (task->next) {
2182         task->next->prev = task->prev;
2183     }
2184     task->next = task->prev = NULL;
2185 }
2186
2187 /* ---------------------------------------------------------------------------
2188  * Suspending & resuming Haskell threads.
2189  * 
2190  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2191  * its capability before calling the C function.  This allows another
2192  * task to pick up the capability and carry on running Haskell
2193  * threads.  It also means that if the C call blocks, it won't lock
2194  * the whole system.
2195  *
2196  * The Haskell thread making the C call is put to sleep for the
2197  * duration of the call, on the susepended_ccalling_threads queue.  We
2198  * give out a token to the task, which it can use to resume the thread
2199  * on return from the C function.
2200  * ------------------------------------------------------------------------- */
2201    
2202 void *
2203 suspendThread (StgRegTable *reg)
2204 {
2205   Capability *cap;
2206   int saved_errno = errno;
2207   StgTSO *tso;
2208   Task *task;
2209
2210   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2211    */
2212   cap = regTableToCapability(reg);
2213
2214   task = cap->running_task;
2215   tso = cap->r.rCurrentTSO;
2216
2217   IF_DEBUG(scheduler,
2218            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2219
2220   // XXX this might not be necessary --SDM
2221   tso->what_next = ThreadRunGHC;
2222
2223   threadPaused(cap,tso);
2224
2225   if(tso->blocked_exceptions == NULL)  {
2226       tso->why_blocked = BlockedOnCCall;
2227       tso->blocked_exceptions = END_TSO_QUEUE;
2228   } else {
2229       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2230   }
2231
2232   // Hand back capability
2233   task->suspended_tso = tso;
2234
2235   ACQUIRE_LOCK(&cap->lock);
2236
2237   suspendTask(cap,task);
2238   cap->in_haskell = rtsFalse;
2239   releaseCapability_(cap);
2240   
2241   RELEASE_LOCK(&cap->lock);
2242
2243 #if defined(THREADED_RTS)
2244   /* Preparing to leave the RTS, so ensure there's a native thread/task
2245      waiting to take over.
2246   */
2247   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2248 #endif
2249
2250   errno = saved_errno;
2251   return task;
2252 }
2253
2254 StgRegTable *
2255 resumeThread (void *task_)
2256 {
2257     StgTSO *tso;
2258     Capability *cap;
2259     int saved_errno = errno;
2260     Task *task = task_;
2261
2262     cap = task->cap;
2263     // Wait for permission to re-enter the RTS with the result.
2264     waitForReturnCapability(&cap,task);
2265     // we might be on a different capability now... but if so, our
2266     // entry on the suspended_ccalling_tasks list will also have been
2267     // migrated.
2268
2269     // Remove the thread from the suspended list
2270     recoverSuspendedTask(cap,task);
2271
2272     tso = task->suspended_tso;
2273     task->suspended_tso = NULL;
2274     tso->link = END_TSO_QUEUE;
2275     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2276     
2277     if (tso->why_blocked == BlockedOnCCall) {
2278         awakenBlockedQueue(cap,tso->blocked_exceptions);
2279         tso->blocked_exceptions = NULL;
2280     }
2281     
2282     /* Reset blocking status */
2283     tso->why_blocked  = NotBlocked;
2284     
2285     cap->r.rCurrentTSO = tso;
2286     cap->in_haskell = rtsTrue;
2287     errno = saved_errno;
2288
2289     /* We might have GC'd, mark the TSO dirty again */
2290     dirtyTSO(tso);
2291
2292     return &cap->r;
2293 }
2294
2295 /* ---------------------------------------------------------------------------
2296  * Comparing Thread ids.
2297  *
2298  * This is used from STG land in the implementation of the
2299  * instances of Eq/Ord for ThreadIds.
2300  * ------------------------------------------------------------------------ */
2301
2302 int
2303 cmp_thread(StgPtr tso1, StgPtr tso2) 
2304
2305   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2306   StgThreadID id2 = ((StgTSO *)tso2)->id;
2307  
2308   if (id1 < id2) return (-1);
2309   if (id1 > id2) return 1;
2310   return 0;
2311 }
2312
2313 /* ---------------------------------------------------------------------------
2314  * Fetching the ThreadID from an StgTSO.
2315  *
2316  * This is used in the implementation of Show for ThreadIds.
2317  * ------------------------------------------------------------------------ */
2318 int
2319 rts_getThreadId(StgPtr tso) 
2320 {
2321   return ((StgTSO *)tso)->id;
2322 }
2323
2324 #ifdef DEBUG
2325 void
2326 labelThread(StgPtr tso, char *label)
2327 {
2328   int len;
2329   void *buf;
2330
2331   /* Caveat: Once set, you can only set the thread name to "" */
2332   len = strlen(label)+1;
2333   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2334   strncpy(buf,label,len);
2335   /* Update will free the old memory for us */
2336   updateThreadLabel(((StgTSO *)tso)->id,buf);
2337 }
2338 #endif /* DEBUG */
2339
2340 /* ---------------------------------------------------------------------------
2341    Create a new thread.
2342
2343    The new thread starts with the given stack size.  Before the
2344    scheduler can run, however, this thread needs to have a closure
2345    (and possibly some arguments) pushed on its stack.  See
2346    pushClosure() in Schedule.h.
2347
2348    createGenThread() and createIOThread() (in SchedAPI.h) are
2349    convenient packaged versions of this function.
2350
2351    currently pri (priority) is only used in a GRAN setup -- HWL
2352    ------------------------------------------------------------------------ */
2353 #if defined(GRAN)
2354 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2355 StgTSO *
2356 createThread(nat size, StgInt pri)
2357 #else
2358 StgTSO *
2359 createThread(Capability *cap, nat size)
2360 #endif
2361 {
2362     StgTSO *tso;
2363     nat stack_size;
2364
2365     /* sched_mutex is *not* required */
2366
2367     /* First check whether we should create a thread at all */
2368 #if defined(PARALLEL_HASKELL)
2369     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2370     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2371         threadsIgnored++;
2372         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2373                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2374         return END_TSO_QUEUE;
2375     }
2376     threadsCreated++;
2377 #endif
2378
2379 #if defined(GRAN)
2380     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2381 #endif
2382
2383     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2384
2385     /* catch ridiculously small stack sizes */
2386     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2387         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2388     }
2389
2390     stack_size = size - TSO_STRUCT_SIZEW;
2391     
2392     tso = (StgTSO *)allocateLocal(cap, size);
2393     TICK_ALLOC_TSO(stack_size, 0);
2394
2395     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2396 #if defined(GRAN)
2397     SET_GRAN_HDR(tso, ThisPE);
2398 #endif
2399
2400     // Always start with the compiled code evaluator
2401     tso->what_next = ThreadRunGHC;
2402
2403     tso->why_blocked  = NotBlocked;
2404     tso->blocked_exceptions = NULL;
2405     tso->flags = TSO_DIRTY;
2406     
2407     tso->saved_errno = 0;
2408     tso->bound = NULL;
2409     
2410     tso->stack_size     = stack_size;
2411     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2412                           - TSO_STRUCT_SIZEW;
2413     tso->sp             = (P_)&(tso->stack) + stack_size;
2414
2415     tso->trec = NO_TREC;
2416     
2417 #ifdef PROFILING
2418     tso->prof.CCCS = CCS_MAIN;
2419 #endif
2420     
2421   /* put a stop frame on the stack */
2422     tso->sp -= sizeofW(StgStopFrame);
2423     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2424     tso->link = END_TSO_QUEUE;
2425     
2426   // ToDo: check this
2427 #if defined(GRAN)
2428     /* uses more flexible routine in GranSim */
2429     insertThread(tso, CurrentProc);
2430 #else
2431     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2432      * from its creation
2433      */
2434 #endif
2435     
2436 #if defined(GRAN) 
2437     if (RtsFlags.GranFlags.GranSimStats.Full) 
2438         DumpGranEvent(GR_START,tso);
2439 #elif defined(PARALLEL_HASKELL)
2440     if (RtsFlags.ParFlags.ParStats.Full) 
2441         DumpGranEvent(GR_STARTQ,tso);
2442     /* HACk to avoid SCHEDULE 
2443        LastTSO = tso; */
2444 #endif
2445     
2446     /* Link the new thread on the global thread list.
2447      */
2448     ACQUIRE_LOCK(&sched_mutex);
2449     tso->id = next_thread_id++;  // while we have the mutex
2450     tso->global_link = all_threads;
2451     all_threads = tso;
2452     RELEASE_LOCK(&sched_mutex);
2453     
2454 #if defined(DIST)
2455     tso->dist.priority = MandatoryPriority; //by default that is...
2456 #endif
2457     
2458 #if defined(GRAN)
2459     tso->gran.pri = pri;
2460 # if defined(DEBUG)
2461     tso->gran.magic = TSO_MAGIC; // debugging only
2462 # endif
2463     tso->gran.sparkname   = 0;
2464     tso->gran.startedat   = CURRENT_TIME; 
2465     tso->gran.exported    = 0;
2466     tso->gran.basicblocks = 0;
2467     tso->gran.allocs      = 0;
2468     tso->gran.exectime    = 0;
2469     tso->gran.fetchtime   = 0;
2470     tso->gran.fetchcount  = 0;
2471     tso->gran.blocktime   = 0;
2472     tso->gran.blockcount  = 0;
2473     tso->gran.blockedat   = 0;
2474     tso->gran.globalsparks = 0;
2475     tso->gran.localsparks  = 0;
2476     if (RtsFlags.GranFlags.Light)
2477         tso->gran.clock  = Now; /* local clock */
2478     else
2479         tso->gran.clock  = 0;
2480     
2481     IF_DEBUG(gran,printTSO(tso));
2482 #elif defined(PARALLEL_HASKELL)
2483 # if defined(DEBUG)
2484     tso->par.magic = TSO_MAGIC; // debugging only
2485 # endif
2486     tso->par.sparkname   = 0;
2487     tso->par.startedat   = CURRENT_TIME; 
2488     tso->par.exported    = 0;
2489     tso->par.basicblocks = 0;
2490     tso->par.allocs      = 0;
2491     tso->par.exectime    = 0;
2492     tso->par.fetchtime   = 0;
2493     tso->par.fetchcount  = 0;
2494     tso->par.blocktime   = 0;
2495     tso->par.blockcount  = 0;
2496     tso->par.blockedat   = 0;
2497     tso->par.globalsparks = 0;
2498     tso->par.localsparks  = 0;
2499 #endif
2500     
2501 #if defined(GRAN)
2502     globalGranStats.tot_threads_created++;
2503     globalGranStats.threads_created_on_PE[CurrentProc]++;
2504     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2505     globalGranStats.tot_sq_probes++;
2506 #elif defined(PARALLEL_HASKELL)
2507     // collect parallel global statistics (currently done together with GC stats)
2508     if (RtsFlags.ParFlags.ParStats.Global &&
2509         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2510         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2511         globalParStats.tot_threads_created++;
2512     }
2513 #endif 
2514     
2515 #if defined(GRAN)
2516     IF_GRAN_DEBUG(pri,
2517                   sched_belch("==__ schedule: Created TSO %d (%p);",
2518                               CurrentProc, tso, tso->id));
2519 #elif defined(PARALLEL_HASKELL)
2520     IF_PAR_DEBUG(verbose,
2521                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2522                              (long)tso->id, tso, advisory_thread_count));
2523 #else
2524     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2525                                    (long)tso->id, (long)tso->stack_size));
2526 #endif    
2527     return tso;
2528 }
2529
2530 #if defined(PAR)
2531 /* RFP:
2532    all parallel thread creation calls should fall through the following routine.
2533 */
2534 StgTSO *
2535 createThreadFromSpark(rtsSpark spark) 
2536 { StgTSO *tso;
2537   ASSERT(spark != (rtsSpark)NULL);
2538 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2539   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2540   { threadsIgnored++;
2541     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2542           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2543     return END_TSO_QUEUE;
2544   }
2545   else
2546   { threadsCreated++;
2547     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2548     if (tso==END_TSO_QUEUE)     
2549       barf("createSparkThread: Cannot create TSO");
2550 #if defined(DIST)
2551     tso->priority = AdvisoryPriority;
2552 #endif
2553     pushClosure(tso,spark);
2554     addToRunQueue(tso);
2555     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2556   }
2557   return tso;
2558 }
2559 #endif
2560
2561 /*
2562   Turn a spark into a thread.
2563   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2564 */
2565 #if 0
2566 StgTSO *
2567 activateSpark (rtsSpark spark) 
2568 {
2569   StgTSO *tso;
2570
2571   tso = createSparkThread(spark);
2572   if (RtsFlags.ParFlags.ParStats.Full) {   
2573     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2574       IF_PAR_DEBUG(verbose,
2575                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2576                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2577   }
2578   // ToDo: fwd info on local/global spark to thread -- HWL
2579   // tso->gran.exported =  spark->exported;
2580   // tso->gran.locked =   !spark->global;
2581   // tso->gran.sparkname = spark->name;
2582
2583   return tso;
2584 }
2585 #endif
2586
2587 /* ---------------------------------------------------------------------------
2588  * scheduleThread()
2589  *
2590  * scheduleThread puts a thread on the end  of the runnable queue.
2591  * This will usually be done immediately after a thread is created.
2592  * The caller of scheduleThread must create the thread using e.g.
2593  * createThread and push an appropriate closure
2594  * on this thread's stack before the scheduler is invoked.
2595  * ------------------------------------------------------------------------ */
2596
2597 void
2598 scheduleThread(Capability *cap, StgTSO *tso)
2599 {
2600     // The thread goes at the *end* of the run-queue, to avoid possible
2601     // starvation of any threads already on the queue.
2602     appendToRunQueue(cap,tso);
2603 }
2604
2605 Capability *
2606 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2607 {
2608     Task *task;
2609
2610     // We already created/initialised the Task
2611     task = cap->running_task;
2612
2613     // This TSO is now a bound thread; make the Task and TSO
2614     // point to each other.
2615     tso->bound = task;
2616
2617     task->tso = tso;
2618     task->ret = ret;
2619     task->stat = NoStatus;
2620
2621     appendToRunQueue(cap,tso);
2622
2623     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2624
2625 #if defined(GRAN)
2626     /* GranSim specific init */
2627     CurrentTSO = m->tso;                // the TSO to run
2628     procStatus[MainProc] = Busy;        // status of main PE
2629     CurrentProc = MainProc;             // PE to run it on
2630 #endif
2631
2632     cap = schedule(cap,task);
2633
2634     ASSERT(task->stat != NoStatus);
2635     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2636
2637     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2638     return cap;
2639 }
2640
2641 /* ----------------------------------------------------------------------------
2642  * Starting Tasks
2643  * ------------------------------------------------------------------------- */
2644
2645 #if defined(THREADED_RTS)
2646 void
2647 workerStart(Task *task)
2648 {
2649     Capability *cap;
2650
2651     // See startWorkerTask().
2652     ACQUIRE_LOCK(&task->lock);
2653     cap = task->cap;
2654     RELEASE_LOCK(&task->lock);
2655
2656     // set the thread-local pointer to the Task:
2657     taskEnter(task);
2658
2659     // schedule() runs without a lock.
2660     cap = schedule(cap,task);
2661
2662     // On exit from schedule(), we have a Capability.
2663     releaseCapability(cap);
2664     taskStop(task);
2665 }
2666 #endif
2667
2668 /* ---------------------------------------------------------------------------
2669  * initScheduler()
2670  *
2671  * Initialise the scheduler.  This resets all the queues - if the
2672  * queues contained any threads, they'll be garbage collected at the
2673  * next pass.
2674  *
2675  * ------------------------------------------------------------------------ */
2676
2677 void 
2678 initScheduler(void)
2679 {
2680 #if defined(GRAN)
2681   nat i;
2682   for (i=0; i<=MAX_PROC; i++) {
2683     run_queue_hds[i]      = END_TSO_QUEUE;
2684     run_queue_tls[i]      = END_TSO_QUEUE;
2685     blocked_queue_hds[i]  = END_TSO_QUEUE;
2686     blocked_queue_tls[i]  = END_TSO_QUEUE;
2687     ccalling_threadss[i]  = END_TSO_QUEUE;
2688     blackhole_queue[i]    = END_TSO_QUEUE;
2689     sleeping_queue        = END_TSO_QUEUE;
2690   }
2691 #elif !defined(THREADED_RTS)
2692   blocked_queue_hd  = END_TSO_QUEUE;
2693   blocked_queue_tl  = END_TSO_QUEUE;
2694   sleeping_queue    = END_TSO_QUEUE;
2695 #endif
2696
2697   blackhole_queue   = END_TSO_QUEUE;
2698   all_threads       = END_TSO_QUEUE;
2699
2700   context_switch = 0;
2701   interrupted    = 0;
2702
2703   RtsFlags.ConcFlags.ctxtSwitchTicks =
2704       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2705       
2706 #if defined(THREADED_RTS)
2707   /* Initialise the mutex and condition variables used by
2708    * the scheduler. */
2709   initMutex(&sched_mutex);
2710 #endif
2711   
2712   ACQUIRE_LOCK(&sched_mutex);
2713
2714   /* A capability holds the state a native thread needs in
2715    * order to execute STG code. At least one capability is
2716    * floating around (only THREADED_RTS builds have more than one).
2717    */
2718   initCapabilities();
2719
2720   initTaskManager();
2721
2722 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2723   initSparkPools();
2724 #endif
2725
2726 #if defined(THREADED_RTS)
2727   /*
2728    * Eagerly start one worker to run each Capability, except for
2729    * Capability 0.  The idea is that we're probably going to start a
2730    * bound thread on Capability 0 pretty soon, so we don't want a
2731    * worker task hogging it.
2732    */
2733   { 
2734       nat i;
2735       Capability *cap;
2736       for (i = 1; i < n_capabilities; i++) {
2737           cap = &capabilities[i];
2738           ACQUIRE_LOCK(&cap->lock);
2739           startWorkerTask(cap, workerStart);
2740           RELEASE_LOCK(&cap->lock);
2741       }
2742   }
2743 #endif
2744
2745   RELEASE_LOCK(&sched_mutex);
2746 }
2747
2748 void
2749 exitScheduler( void )
2750 {
2751     interrupted = rtsTrue;
2752     shutting_down_scheduler = rtsTrue;
2753
2754 #if defined(THREADED_RTS)
2755     { 
2756         Task *task;
2757         nat i;
2758         
2759         ACQUIRE_LOCK(&sched_mutex);
2760         task = newBoundTask();
2761         RELEASE_LOCK(&sched_mutex);
2762
2763         for (i = 0; i < n_capabilities; i++) {
2764             shutdownCapability(&capabilities[i], task);
2765         }
2766         boundTaskExiting(task);
2767         stopTaskManager();
2768     }
2769 #endif
2770 }
2771
2772 /* ---------------------------------------------------------------------------
2773    Where are the roots that we know about?
2774
2775         - all the threads on the runnable queue
2776         - all the threads on the blocked queue
2777         - all the threads on the sleeping queue
2778         - all the thread currently executing a _ccall_GC
2779         - all the "main threads"
2780      
2781    ------------------------------------------------------------------------ */
2782
2783 /* This has to be protected either by the scheduler monitor, or by the
2784         garbage collection monitor (probably the latter).
2785         KH @ 25/10/99
2786 */
2787
2788 void
2789 GetRoots( evac_fn evac )
2790 {
2791     nat i;
2792     Capability *cap;
2793     Task *task;
2794
2795 #if defined(GRAN)
2796     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2797         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2798             evac((StgClosure **)&run_queue_hds[i]);
2799         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2800             evac((StgClosure **)&run_queue_tls[i]);
2801         
2802         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2803             evac((StgClosure **)&blocked_queue_hds[i]);
2804         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2805             evac((StgClosure **)&blocked_queue_tls[i]);
2806         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2807             evac((StgClosure **)&ccalling_threads[i]);
2808     }
2809
2810     markEventQueue();
2811
2812 #else /* !GRAN */
2813
2814     for (i = 0; i < n_capabilities; i++) {
2815         cap = &capabilities[i];
2816         evac((StgClosure **)&cap->run_queue_hd);
2817         evac((StgClosure **)&cap->run_queue_tl);
2818         
2819         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2820              task=task->next) {
2821             evac((StgClosure **)&task->suspended_tso);
2822         }
2823     }
2824     
2825 #if !defined(THREADED_RTS)
2826     evac((StgClosure **)(void *)&blocked_queue_hd);
2827     evac((StgClosure **)(void *)&blocked_queue_tl);
2828     evac((StgClosure **)(void *)&sleeping_queue);
2829 #endif 
2830 #endif
2831
2832     // evac((StgClosure **)&blackhole_queue);
2833
2834 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2835     markSparkQueue(evac);
2836 #endif
2837     
2838 #if defined(RTS_USER_SIGNALS)
2839     // mark the signal handlers (signals should be already blocked)
2840     markSignalHandlers(evac);
2841 #endif
2842 }
2843
2844 /* -----------------------------------------------------------------------------
2845    performGC
2846
2847    This is the interface to the garbage collector from Haskell land.
2848    We provide this so that external C code can allocate and garbage
2849    collect when called from Haskell via _ccall_GC.
2850
2851    It might be useful to provide an interface whereby the programmer
2852    can specify more roots (ToDo).
2853    
2854    This needs to be protected by the GC condition variable above.  KH.
2855    -------------------------------------------------------------------------- */
2856
2857 static void (*extra_roots)(evac_fn);
2858
2859 static void
2860 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2861 {
2862     Task *task = myTask();
2863
2864     if (task == NULL) {
2865         ACQUIRE_LOCK(&sched_mutex);
2866         task = newBoundTask();
2867         RELEASE_LOCK(&sched_mutex);
2868         scheduleDoGC(NULL,task,force_major, get_roots);
2869         boundTaskExiting(task);
2870     } else {
2871         scheduleDoGC(NULL,task,force_major, get_roots);
2872     }
2873 }
2874
2875 void
2876 performGC(void)
2877 {
2878     performGC_(rtsFalse, GetRoots);
2879 }
2880
2881 void
2882 performMajorGC(void)
2883 {
2884     performGC_(rtsTrue, GetRoots);
2885 }
2886
2887 static void
2888 AllRoots(evac_fn evac)
2889 {
2890     GetRoots(evac);             // the scheduler's roots
2891     extra_roots(evac);          // the user's roots
2892 }
2893
2894 void
2895 performGCWithRoots(void (*get_roots)(evac_fn))
2896 {
2897     extra_roots = get_roots;
2898     performGC_(rtsFalse, AllRoots);
2899 }
2900
2901 /* -----------------------------------------------------------------------------
2902    Stack overflow
2903
2904    If the thread has reached its maximum stack size, then raise the
2905    StackOverflow exception in the offending thread.  Otherwise
2906    relocate the TSO into a larger chunk of memory and adjust its stack
2907    size appropriately.
2908    -------------------------------------------------------------------------- */
2909
2910 static StgTSO *
2911 threadStackOverflow(Capability *cap, StgTSO *tso)
2912 {
2913   nat new_stack_size, stack_words;
2914   lnat new_tso_size;
2915   StgPtr new_sp;
2916   StgTSO *dest;
2917
2918   IF_DEBUG(sanity,checkTSO(tso));
2919   if (tso->stack_size >= tso->max_stack_size) {
2920
2921     IF_DEBUG(gc,
2922              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2923                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2924              /* If we're debugging, just print out the top of the stack */
2925              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2926                                               tso->sp+64)));
2927
2928     /* Send this thread the StackOverflow exception */
2929     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2930     return tso;
2931   }
2932
2933   /* Try to double the current stack size.  If that takes us over the
2934    * maximum stack size for this thread, then use the maximum instead.
2935    * Finally round up so the TSO ends up as a whole number of blocks.
2936    */
2937   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2938   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2939                                        TSO_STRUCT_SIZE)/sizeof(W_);
2940   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2941   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2942
2943   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2944
2945   dest = (StgTSO *)allocate(new_tso_size);
2946   TICK_ALLOC_TSO(new_stack_size,0);
2947
2948   /* copy the TSO block and the old stack into the new area */
2949   memcpy(dest,tso,TSO_STRUCT_SIZE);
2950   stack_words = tso->stack + tso->stack_size - tso->sp;
2951   new_sp = (P_)dest + new_tso_size - stack_words;
2952   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2953
2954   /* relocate the stack pointers... */
2955   dest->sp         = new_sp;
2956   dest->stack_size = new_stack_size;
2957         
2958   /* Mark the old TSO as relocated.  We have to check for relocated
2959    * TSOs in the garbage collector and any primops that deal with TSOs.
2960    *
2961    * It's important to set the sp value to just beyond the end
2962    * of the stack, so we don't attempt to scavenge any part of the
2963    * dead TSO's stack.
2964    */
2965   tso->what_next = ThreadRelocated;
2966   tso->link = dest;
2967   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2968   tso->why_blocked = NotBlocked;
2969
2970   IF_PAR_DEBUG(verbose,
2971                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2972                      tso->id, tso, tso->stack_size);
2973                /* If we're debugging, just print out the top of the stack */
2974                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2975                                                 tso->sp+64)));
2976   
2977   IF_DEBUG(sanity,checkTSO(tso));
2978 #if 0
2979   IF_DEBUG(scheduler,printTSO(dest));
2980 #endif
2981
2982   return dest;
2983 }
2984
2985 /* ---------------------------------------------------------------------------
2986    Wake up a queue that was blocked on some resource.
2987    ------------------------------------------------------------------------ */
2988
2989 #if defined(GRAN)
2990 STATIC_INLINE void
2991 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2992 {
2993 }
2994 #elif defined(PARALLEL_HASKELL)
2995 STATIC_INLINE void
2996 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2997 {
2998   /* write RESUME events to log file and
2999      update blocked and fetch time (depending on type of the orig closure) */
3000   if (RtsFlags.ParFlags.ParStats.Full) {
3001     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3002                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3003                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3004     if (emptyRunQueue())
3005       emitSchedule = rtsTrue;
3006
3007     switch (get_itbl(node)->type) {
3008         case FETCH_ME_BQ:
3009           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3010           break;
3011         case RBH:
3012         case FETCH_ME:
3013         case BLACKHOLE_BQ:
3014           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3015           break;
3016 #ifdef DIST
3017         case MVAR:
3018           break;
3019 #endif    
3020         default:
3021           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3022         }
3023       }
3024 }
3025 #endif
3026
3027 #if defined(GRAN)
3028 StgBlockingQueueElement *
3029 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3030 {
3031     StgTSO *tso;
3032     PEs node_loc, tso_loc;
3033
3034     node_loc = where_is(node); // should be lifted out of loop
3035     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3036     tso_loc = where_is((StgClosure *)tso);
3037     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3038       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3039       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3040       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3041       // insertThread(tso, node_loc);
3042       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3043                 ResumeThread,
3044                 tso, node, (rtsSpark*)NULL);
3045       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3046       // len_local++;
3047       // len++;
3048     } else { // TSO is remote (actually should be FMBQ)
3049       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3050                                   RtsFlags.GranFlags.Costs.gunblocktime +
3051                                   RtsFlags.GranFlags.Costs.latency;
3052       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3053                 UnblockThread,
3054                 tso, node, (rtsSpark*)NULL);
3055       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3056       // len++;
3057     }
3058     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3059     IF_GRAN_DEBUG(bq,
3060                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3061                           (node_loc==tso_loc ? "Local" : "Global"), 
3062                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3063     tso->block_info.closure = NULL;
3064     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3065                              tso->id, tso));
3066 }
3067 #elif defined(PARALLEL_HASKELL)
3068 StgBlockingQueueElement *
3069 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3070 {
3071     StgBlockingQueueElement *next;
3072
3073     switch (get_itbl(bqe)->type) {
3074     case TSO:
3075       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3076       /* if it's a TSO just push it onto the run_queue */
3077       next = bqe->link;
3078       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3079       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3080       threadRunnable();
3081       unblockCount(bqe, node);
3082       /* reset blocking status after dumping event */
3083       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3084       break;
3085
3086     case BLOCKED_FETCH:
3087       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3088       next = bqe->link;
3089       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3090       PendingFetches = (StgBlockedFetch *)bqe;
3091       break;
3092
3093 # if defined(DEBUG)
3094       /* can ignore this case in a non-debugging setup; 
3095          see comments on RBHSave closures above */
3096     case CONSTR:
3097       /* check that the closure is an RBHSave closure */
3098       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3099              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3100              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3101       break;
3102
3103     default:
3104       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3105            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3106            (StgClosure *)bqe);
3107 # endif
3108     }
3109   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3110   return next;
3111 }
3112 #endif
3113
3114 StgTSO *
3115 unblockOne(Capability *cap, StgTSO *tso)
3116 {
3117   StgTSO *next;
3118
3119   ASSERT(get_itbl(tso)->type == TSO);
3120   ASSERT(tso->why_blocked != NotBlocked);
3121   tso->why_blocked = NotBlocked;
3122   next = tso->link;
3123   tso->link = END_TSO_QUEUE;
3124
3125   // We might have just migrated this TSO to our Capability:
3126   if (tso->bound) {
3127       tso->bound->cap = cap;
3128   }
3129
3130   appendToRunQueue(cap,tso);
3131
3132   // we're holding a newly woken thread, make sure we context switch
3133   // quickly so we can migrate it if necessary.
3134   context_switch = 1;
3135   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3136   return next;
3137 }
3138
3139
3140 #if defined(GRAN)
3141 void 
3142 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3143 {
3144   StgBlockingQueueElement *bqe;
3145   PEs node_loc;
3146   nat len = 0; 
3147
3148   IF_GRAN_DEBUG(bq, 
3149                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3150                       node, CurrentProc, CurrentTime[CurrentProc], 
3151                       CurrentTSO->id, CurrentTSO));
3152
3153   node_loc = where_is(node);
3154
3155   ASSERT(q == END_BQ_QUEUE ||
3156          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3157          get_itbl(q)->type == CONSTR); // closure (type constructor)
3158   ASSERT(is_unique(node));
3159
3160   /* FAKE FETCH: magically copy the node to the tso's proc;
3161      no Fetch necessary because in reality the node should not have been 
3162      moved to the other PE in the first place
3163   */
3164   if (CurrentProc!=node_loc) {
3165     IF_GRAN_DEBUG(bq, 
3166                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3167                         node, node_loc, CurrentProc, CurrentTSO->id, 
3168                         // CurrentTSO, where_is(CurrentTSO),
3169                         node->header.gran.procs));
3170     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3171     IF_GRAN_DEBUG(bq, 
3172                   debugBelch("## new bitmask of node %p is %#x\n",
3173                         node, node->header.gran.procs));
3174     if (RtsFlags.GranFlags.GranSimStats.Global) {
3175       globalGranStats.tot_fake_fetches++;
3176     }
3177   }
3178
3179   bqe = q;
3180   // ToDo: check: ASSERT(CurrentProc==node_loc);
3181   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3182     //next = bqe->link;
3183     /* 
3184        bqe points to the current element in the queue
3185        next points to the next element in the queue
3186     */
3187     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3188     //tso_loc = where_is(tso);
3189     len++;
3190     bqe = unblockOne(bqe, node);
3191   }
3192
3193   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3194      the closure to make room for the anchor of the BQ */
3195   if (bqe!=END_BQ_QUEUE) {
3196     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3197     /*
3198     ASSERT((info_ptr==&RBH_Save_0_info) ||
3199            (info_ptr==&RBH_Save_1_info) ||
3200            (info_ptr==&RBH_Save_2_info));
3201     */
3202     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3203     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3204     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3205
3206     IF_GRAN_DEBUG(bq,
3207                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3208                         node, info_type(node)));
3209   }
3210
3211   /* statistics gathering */
3212   if (RtsFlags.GranFlags.GranSimStats.Global) {
3213     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3214     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3215     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3216     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3217   }
3218   IF_GRAN_DEBUG(bq,
3219                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3220                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3221 }
3222 #elif defined(PARALLEL_HASKELL)
3223 void 
3224 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3225 {
3226   StgBlockingQueueElement *bqe;
3227
3228   IF_PAR_DEBUG(verbose, 
3229                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3230                      node, mytid));
3231 #ifdef DIST  
3232   //RFP
3233   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3234     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3235     return;
3236   }
3237 #endif
3238   
3239   ASSERT(q == END_BQ_QUEUE ||
3240          get_itbl(q)->type == TSO ||           
3241          get_itbl(q)->type == BLOCKED_FETCH || 
3242          get_itbl(q)->type == CONSTR); 
3243
3244   bqe = q;
3245   while (get_itbl(bqe)->type==TSO || 
3246          get_itbl(bqe)->type==BLOCKED_FETCH) {
3247     bqe = unblockOne(bqe, node);
3248   }
3249 }
3250
3251 #else   /* !GRAN && !PARALLEL_HASKELL */
3252
3253 void
3254 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3255 {
3256     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3257                              // Exception.cmm
3258     while (tso != END_TSO_QUEUE) {
3259         tso = unblockOne(cap,tso);
3260     }
3261 }
3262 #endif
3263
3264 /* ---------------------------------------------------------------------------
3265    Interrupt execution
3266    - usually called inside a signal handler so it mustn't do anything fancy.   
3267    ------------------------------------------------------------------------ */
3268
3269 void
3270 interruptStgRts(void)
3271 {
3272     interrupted    = 1;
3273     context_switch = 1;
3274 #if defined(THREADED_RTS)
3275     prodAllCapabilities();
3276 #endif
3277 }
3278
3279 /* -----------------------------------------------------------------------------
3280    Unblock a thread
3281
3282    This is for use when we raise an exception in another thread, which
3283    may be blocked.
3284    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3285    -------------------------------------------------------------------------- */
3286
3287 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3288 /*
3289   NB: only the type of the blocking queue is different in GranSim and GUM
3290       the operations on the queue-elements are the same
3291       long live polymorphism!
3292
3293   Locks: sched_mutex is held upon entry and exit.
3294
3295 */
3296 static void
3297 unblockThread(Capability *cap, StgTSO *tso)
3298 {
3299   StgBlockingQueueElement *t, **last;
3300
3301   switch (tso->why_blocked) {
3302
3303   case NotBlocked:
3304     return;  /* not blocked */
3305
3306   case BlockedOnSTM:
3307     // Be careful: nothing to do here!  We tell the scheduler that the thread
3308     // is runnable and we leave it to the stack-walking code to abort the 
3309     // transaction while unwinding the stack.  We should perhaps have a debugging
3310     // test to make sure that this really happens and that the 'zombie' transaction
3311     // does not get committed.
3312     goto done;
3313
3314   case BlockedOnMVar:
3315     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3316     {
3317       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3318       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3319
3320       last = (StgBlockingQueueElement **)&mvar->head;
3321       for (t = (StgBlockingQueueElement *)mvar->head; 
3322            t != END_BQ_QUEUE; 
3323            last = &t->link, last_tso = t, t = t->link) {
3324         if (t == (StgBlockingQueueElement *)tso) {
3325           *last = (StgBlockingQueueElement *)tso->link;
3326           if (mvar->tail == tso) {
3327             mvar->tail = (StgTSO *)last_tso;
3328           }
3329           goto done;
3330         }
3331       }
3332       barf("unblockThread (MVAR): TSO not found");
3333     }
3334
3335   case BlockedOnBlackHole:
3336     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3337     {
3338       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3339
3340       last = &bq->blocking_queue;
3341       for (t = bq->blocking_queue; 
3342            t != END_BQ_QUEUE; 
3343            last = &t->link, t = t->link) {
3344         if (t == (StgBlockingQueueElement *)tso) {
3345           *last = (StgBlockingQueueElement *)tso->link;
3346           goto done;
3347         }
3348       }
3349       barf("unblockThread (BLACKHOLE): TSO not found");
3350     }
3351
3352   case BlockedOnException:
3353     {
3354       StgTSO *target  = tso->block_info.tso;
3355
3356       ASSERT(get_itbl(target)->type == TSO);
3357
3358       if (target->what_next == ThreadRelocated) {
3359           target = target->link;
3360           ASSERT(get_itbl(target)->type == TSO);
3361       }
3362
3363       ASSERT(target->blocked_exceptions != NULL);
3364
3365       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3366       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3367            t != END_BQ_QUEUE; 
3368            last = &t->link, t = t->link) {
3369         ASSERT(get_itbl(t)->type == TSO);
3370         if (t == (StgBlockingQueueElement *)tso) {
3371           *last = (StgBlockingQueueElement *)tso->link;
3372           goto done;
3373         }
3374       }
3375       barf("unblockThread (Exception): TSO not found");
3376     }
3377
3378   case BlockedOnRead:
3379   case BlockedOnWrite:
3380 #if defined(mingw32_HOST_OS)
3381   case BlockedOnDoProc:
3382 #endif
3383     {
3384       /* take TSO off blocked_queue */
3385       StgBlockingQueueElement *prev = NULL;
3386       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3387            prev = t, t = t->link) {
3388         if (t == (StgBlockingQueueElement *)tso) {
3389           if (prev == NULL) {
3390             blocked_queue_hd = (StgTSO *)t->link;
3391             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3392               blocked_queue_tl = END_TSO_QUEUE;
3393             }
3394           } else {
3395             prev->link = t->link;
3396             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3397               blocked_queue_tl = (StgTSO *)prev;
3398             }
3399           }
3400 #if defined(mingw32_HOST_OS)
3401           /* (Cooperatively) signal that the worker thread should abort
3402            * the request.
3403            */
3404           abandonWorkRequest(tso->block_info.async_result->reqID);
3405 #endif
3406           goto done;
3407         }
3408       }
3409       barf("unblockThread (I/O): TSO not found");
3410     }
3411
3412   case BlockedOnDelay:
3413     {
3414       /* take TSO off sleeping_queue */
3415       StgBlockingQueueElement *prev = NULL;
3416       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3417            prev = t, t = t->link) {
3418         if (t == (StgBlockingQueueElement *)tso) {
3419           if (prev == NULL) {
3420             sleeping_queue = (StgTSO *)t->link;
3421           } else {
3422             prev->link = t->link;
3423           }
3424           goto done;
3425         }
3426       }
3427       barf("unblockThread (delay): TSO not found");
3428     }
3429
3430   default:
3431     barf("unblockThread");
3432   }
3433
3434  done:
3435   tso->link = END_TSO_QUEUE;
3436   tso->why_blocked = NotBlocked;
3437   tso->block_info.closure = NULL;
3438   pushOnRunQueue(cap,tso);
3439 }
3440 #else
3441 static void
3442 unblockThread(Capability *cap, StgTSO *tso)
3443 {
3444   StgTSO *t, **last;
3445   
3446   /* To avoid locking unnecessarily. */
3447   if (tso->why_blocked == NotBlocked) {
3448     return;
3449   }
3450
3451   switch (tso->why_blocked) {
3452
3453   case BlockedOnSTM:
3454     // Be careful: nothing to do here!  We tell the scheduler that the thread
3455     // is runnable and we leave it to the stack-walking code to abort the 
3456     // transaction while unwinding the stack.  We should perhaps have a debugging
3457     // test to make sure that this really happens and that the 'zombie' transaction
3458     // does not get committed.
3459     goto done;
3460
3461   case BlockedOnMVar:
3462     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3463     {
3464       StgTSO *last_tso = END_TSO_QUEUE;
3465       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3466
3467       last = &mvar->head;
3468       for (t = mvar->head; t != END_TSO_QUEUE; 
3469            last = &t->link, last_tso = t, t = t->link) {
3470         if (t == tso) {
3471           *last = tso->link;
3472           if (mvar->tail == tso) {
3473             mvar->tail = last_tso;
3474           }
3475           goto done;
3476         }
3477       }
3478       barf("unblockThread (MVAR): TSO not found");
3479     }
3480
3481   case BlockedOnBlackHole:
3482     {
3483       last = &blackhole_queue;
3484       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3485            last = &t->link, t = t->link) {
3486         if (t == tso) {
3487           *last = tso->link;
3488           goto done;
3489         }
3490       }
3491       barf("unblockThread (BLACKHOLE): TSO not found");
3492     }
3493
3494   case BlockedOnException:
3495     {
3496       StgTSO *target  = tso->block_info.tso;
3497
3498       ASSERT(get_itbl(target)->type == TSO);
3499
3500       while (target->what_next == ThreadRelocated) {
3501           target = target->link;
3502           ASSERT(get_itbl(target)->type == TSO);
3503       }
3504       
3505       ASSERT(target->blocked_exceptions != NULL);
3506
3507       last = &target->blocked_exceptions;
3508       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3509            last = &t->link, t = t->link) {
3510         ASSERT(get_itbl(t)->type == TSO);
3511         if (t == tso) {
3512           *last = tso->link;
3513           goto done;
3514         }
3515       }
3516       barf("unblockThread (Exception): TSO not found");
3517     }
3518
3519 #if !defined(THREADED_RTS)
3520   case BlockedOnRead:
3521   case BlockedOnWrite:
3522 #if defined(mingw32_HOST_OS)
3523   case BlockedOnDoProc:
3524 #endif
3525     {
3526       StgTSO *prev = NULL;
3527       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3528            prev = t, t = t->link) {
3529         if (t == tso) {
3530           if (prev == NULL) {
3531             blocked_queue_hd = t->link;
3532             if (blocked_queue_tl == t) {
3533               blocked_queue_tl = END_TSO_QUEUE;
3534             }
3535           } else {
3536             prev->link = t->link;
3537             if (blocked_queue_tl == t) {
3538               blocked_queue_tl = prev;
3539             }
3540           }
3541 #if defined(mingw32_HOST_OS)
3542           /* (Cooperatively) signal that the worker thread should abort
3543            * the request.
3544            */
3545           abandonWorkRequest(tso->block_info.async_result->reqID);
3546 #endif
3547           goto done;
3548         }
3549       }
3550       barf("unblockThread (I/O): TSO not found");
3551     }
3552
3553   case BlockedOnDelay:
3554     {
3555       StgTSO *prev = NULL;
3556       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3557            prev = t, t = t->link) {
3558         if (t == tso) {
3559           if (prev == NULL) {
3560             sleeping_queue = t->link;
3561           } else {
3562             prev->link = t->link;
3563           }
3564           goto done;
3565         }
3566       }
3567       barf("unblockThread (delay): TSO not found");
3568     }
3569 #endif
3570
3571   default:
3572     barf("unblockThread");
3573   }
3574
3575  done:
3576   tso->link = END_TSO_QUEUE;
3577   tso->why_blocked = NotBlocked;
3578   tso->block_info.closure = NULL;
3579   appendToRunQueue(cap,tso);
3580 }
3581 #endif
3582
3583 /* -----------------------------------------------------------------------------
3584  * checkBlackHoles()
3585  *
3586  * Check the blackhole_queue for threads that can be woken up.  We do
3587  * this periodically: before every GC, and whenever the run queue is
3588  * empty.
3589  *
3590  * An elegant solution might be to just wake up all the blocked
3591  * threads with awakenBlockedQueue occasionally: they'll go back to
3592  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3593  * doesn't give us a way to tell whether we've actually managed to
3594  * wake up any threads, so we would be busy-waiting.
3595  *
3596  * -------------------------------------------------------------------------- */
3597
3598 static rtsBool
3599 checkBlackHoles (Capability *cap)
3600 {
3601     StgTSO **prev, *t;
3602     rtsBool any_woke_up = rtsFalse;
3603     StgHalfWord type;
3604
3605     // blackhole_queue is global:
3606     ASSERT_LOCK_HELD(&sched_mutex);
3607
3608     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3609
3610     // ASSUMES: sched_mutex
3611     prev = &blackhole_queue;
3612     t = blackhole_queue;
3613     while (t != END_TSO_QUEUE) {
3614         ASSERT(t->why_blocked == BlockedOnBlackHole);
3615         type = get_itbl(t->block_info.closure)->type;
3616         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3617             IF_DEBUG(sanity,checkTSO(t));
3618             t = unblockOne(cap, t);
3619             // urk, the threads migrate to the current capability
3620             // here, but we'd like to keep them on the original one.
3621             *prev = t;
3622             any_woke_up = rtsTrue;
3623         } else {
3624             prev = &t->link;
3625             t = t->link;
3626         }
3627     }
3628
3629     return any_woke_up;
3630 }
3631
3632 /* -----------------------------------------------------------------------------
3633  * raiseAsync()
3634  *
3635  * The following function implements the magic for raising an
3636  * asynchronous exception in an existing thread.
3637  *
3638  * We first remove the thread from any queue on which it might be
3639  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3640  *
3641  * We strip the stack down to the innermost CATCH_FRAME, building
3642  * thunks in the heap for all the active computations, so they can 
3643  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3644  * an application of the handler to the exception, and push it on
3645  * the top of the stack.
3646  * 
3647  * How exactly do we save all the active computations?  We create an
3648  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3649  * AP_STACKs pushes everything from the corresponding update frame
3650  * upwards onto the stack.  (Actually, it pushes everything up to the
3651  * next update frame plus a pointer to the next AP_STACK object.
3652  * Entering the next AP_STACK object pushes more onto the stack until we
3653  * reach the last AP_STACK object - at which point the stack should look
3654  * exactly as it did when we killed the TSO and we can continue
3655  * execution by entering the closure on top of the stack.
3656  *
3657  * We can also kill a thread entirely - this happens if either (a) the 
3658  * exception passed to raiseAsync is NULL, or (b) there's no
3659  * CATCH_FRAME on the stack.  In either case, we strip the entire
3660  * stack and replace the thread with a zombie.
3661  *
3662  * ToDo: in THREADED_RTS mode, this function is only safe if either
3663  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3664  * one Capability), or (b) we own the Capability that the TSO is
3665  * currently blocked on or on the run queue of.
3666  *
3667  * -------------------------------------------------------------------------- */
3668  
3669 void
3670 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3671 {
3672     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3673 }
3674
3675 void
3676 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3677 {
3678     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3679 }
3680
3681 static void
3682 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3683             rtsBool stop_at_atomically, StgPtr stop_here)
3684 {
3685     StgRetInfoTable *info;
3686     StgPtr sp, frame;
3687     nat i;
3688   
3689     // Thread already dead?
3690     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3691         return;
3692     }
3693
3694     IF_DEBUG(scheduler, 
3695              sched_belch("raising exception in thread %ld.", (long)tso->id));
3696     
3697     // Remove it from any blocking queues
3698     unblockThread(cap,tso);
3699
3700     // mark it dirty; we're about to change its stack.
3701     dirtyTSO(tso);
3702
3703     sp = tso->sp;
3704     
3705     // The stack freezing code assumes there's a closure pointer on
3706     // the top of the stack, so we have to arrange that this is the case...
3707     //
3708     if (sp[0] == (W_)&stg_enter_info) {
3709         sp++;
3710     } else {
3711         sp--;
3712         sp[0] = (W_)&stg_dummy_ret_closure;
3713     }
3714
3715     frame = sp + 1;
3716     while (stop_here == NULL || frame < stop_here) {
3717
3718         // 1. Let the top of the stack be the "current closure"
3719         //
3720         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3721         // CATCH_FRAME.
3722         //
3723         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3724         // current closure applied to the chunk of stack up to (but not
3725         // including) the update frame.  This closure becomes the "current
3726         // closure".  Go back to step 2.
3727         //
3728         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3729         // top of the stack applied to the exception.
3730         // 
3731         // 5. If it's a STOP_FRAME, then kill the thread.
3732         // 
3733         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3734         // transaction
3735        
3736         info = get_ret_itbl((StgClosure *)frame);
3737
3738         switch (info->i.type) {
3739
3740         case UPDATE_FRAME:
3741         {
3742             StgAP_STACK * ap;
3743             nat words;
3744             
3745             // First build an AP_STACK consisting of the stack chunk above the
3746             // current update frame, with the top word on the stack as the
3747             // fun field.
3748             //
3749             words = frame - sp - 1;
3750             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3751             
3752             ap->size = words;
3753             ap->fun  = (StgClosure *)sp[0];
3754             sp++;
3755             for(i=0; i < (nat)words; ++i) {
3756                 ap->payload[i] = (StgClosure *)*sp++;
3757             }
3758             
3759             SET_HDR(ap,&stg_AP_STACK_info,
3760                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3761             TICK_ALLOC_UP_THK(words+1,0);
3762             
3763             IF_DEBUG(scheduler,
3764                      debugBelch("sched: Updating ");
3765                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3766                      debugBelch(" with ");
3767                      printObj((StgClosure *)ap);
3768                 );
3769
3770             // Replace the updatee with an indirection
3771             //
3772             // Warning: if we're in a loop, more than one update frame on
3773             // the stack may point to the same object.  Be careful not to
3774             // overwrite an IND_OLDGEN in this case, because we'll screw
3775             // up the mutable lists.  To be on the safe side, don't
3776             // overwrite any kind of indirection at all.  See also
3777             // threadSqueezeStack in GC.c, where we have to make a similar
3778             // check.
3779             //
3780             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3781                 // revert the black hole
3782                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3783                                (StgClosure *)ap);
3784             }
3785             sp += sizeofW(StgUpdateFrame) - 1;
3786             sp[0] = (W_)ap; // push onto stack
3787             frame = sp + 1;
3788             continue; //no need to bump frame
3789         }
3790
3791         case STOP_FRAME:
3792             // We've stripped the entire stack, the thread is now dead.
3793             tso->what_next = ThreadKilled;
3794             tso->sp = frame + sizeofW(StgStopFrame);
3795             return;
3796
3797         case CATCH_FRAME:
3798             // If we find a CATCH_FRAME, and we've got an exception to raise,
3799             // then build the THUNK raise(exception), and leave it on
3800             // top of the CATCH_FRAME ready to enter.
3801             //
3802         {
3803 #ifdef PROFILING
3804             StgCatchFrame *cf = (StgCatchFrame *)frame;
3805 #endif
3806             StgThunk *raise;
3807             
3808             if (exception == NULL) break;
3809
3810             // we've got an exception to raise, so let's pass it to the
3811             // handler in this frame.
3812             //
3813             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3814             TICK_ALLOC_SE_THK(1,0);
3815             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3816             raise->payload[0] = exception;
3817             
3818             // throw away the stack from Sp up to the CATCH_FRAME.
3819             //
3820             sp = frame - 1;
3821             
3822             /* Ensure that async excpetions are blocked now, so we don't get
3823              * a surprise exception before we get around to executing the
3824              * handler.
3825              */
3826             if (tso->blocked_exceptions == NULL) {
3827                 tso->blocked_exceptions = END_TSO_QUEUE;
3828             }
3829
3830             /* Put the newly-built THUNK on top of the stack, ready to execute
3831              * when the thread restarts.
3832              */
3833             sp[0] = (W_)raise;
3834             sp[-1] = (W_)&stg_enter_info;
3835             tso->sp = sp-1;
3836             tso->what_next = ThreadRunGHC;
3837             IF_DEBUG(sanity, checkTSO(tso));
3838             return;
3839         }
3840             
3841         case ATOMICALLY_FRAME:
3842             if (stop_at_atomically) {
3843                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3844                 stmCondemnTransaction(cap, tso -> trec);
3845 #ifdef REG_R1
3846                 tso->sp = frame;
3847 #else
3848                 // R1 is not a register: the return convention for IO in
3849                 // this case puts the return value on the stack, so we
3850                 // need to set up the stack to return to the atomically
3851                 // frame properly...
3852                 tso->sp = frame - 2;
3853                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3854                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3855 #endif
3856                 tso->what_next = ThreadRunGHC;
3857                 return;
3858             }
3859             // Not stop_at_atomically... fall through and abort the
3860             // transaction.
3861             
3862         case CATCH_RETRY_FRAME:
3863             // IF we find an ATOMICALLY_FRAME then we abort the
3864             // current transaction and propagate the exception.  In
3865             // this case (unlike ordinary exceptions) we do not care
3866             // whether the transaction is valid or not because its
3867             // possible validity cannot have caused the exception
3868             // and will not be visible after the abort.
3869             IF_DEBUG(stm,
3870                      debugBelch("Found atomically block delivering async exception\n"));
3871             StgTRecHeader *trec = tso -> trec;
3872             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3873             stmAbortTransaction(cap, trec);
3874             tso -> trec = outer;
3875             break;
3876             
3877         default:
3878             break;
3879         }
3880
3881         // move on to the next stack frame
3882         frame += stack_frame_sizeW((StgClosure *)frame);
3883     }
3884
3885     // if we got here, then we stopped at stop_here
3886     ASSERT(stop_here != NULL);
3887 }
3888
3889 /* -----------------------------------------------------------------------------
3890    Deleting threads
3891
3892    This is used for interruption (^C) and forking, and corresponds to
3893    raising an exception but without letting the thread catch the
3894    exception.
3895    -------------------------------------------------------------------------- */
3896
3897 static void 
3898 deleteThread (Capability *cap, StgTSO *tso)
3899 {
3900   if (tso->why_blocked != BlockedOnCCall &&
3901       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3902       raiseAsync(cap,tso,NULL);
3903   }
3904 }
3905
3906 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3907 static void 
3908 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3909 { // for forkProcess only:
3910   // delete thread without giving it a chance to catch the KillThread exception
3911
3912   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3913       return;
3914   }
3915
3916   if (tso->why_blocked != BlockedOnCCall &&
3917       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3918       unblockThread(cap,tso);
3919   }
3920
3921   tso->what_next = ThreadKilled;
3922 }
3923 #endif
3924
3925 /* -----------------------------------------------------------------------------
3926    raiseExceptionHelper
3927    
3928    This function is called by the raise# primitve, just so that we can
3929    move some of the tricky bits of raising an exception from C-- into
3930    C.  Who knows, it might be a useful re-useable thing here too.
3931    -------------------------------------------------------------------------- */
3932
3933 StgWord
3934 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3935 {
3936     Capability *cap = regTableToCapability(reg);
3937     StgThunk *raise_closure = NULL;
3938     StgPtr p, next;
3939     StgRetInfoTable *info;
3940     //
3941     // This closure represents the expression 'raise# E' where E
3942     // is the exception raise.  It is used to overwrite all the
3943     // thunks which are currently under evaluataion.
3944     //
3945
3946     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3947     // LDV profiling: stg_raise_info has THUNK as its closure
3948     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3949     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3950     // 1 does not cause any problem unless profiling is performed.
3951     // However, when LDV profiling goes on, we need to linearly scan
3952     // small object pool, where raise_closure is stored, so we should
3953     // use MIN_UPD_SIZE.
3954     //
3955     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3956     //                                 sizeofW(StgClosure)+1);
3957     //
3958
3959     //
3960     // Walk up the stack, looking for the catch frame.  On the way,
3961     // we update any closures pointed to from update frames with the
3962     // raise closure that we just built.
3963     //
3964     p = tso->sp;
3965     while(1) {
3966         info = get_ret_itbl((StgClosure *)p);
3967         next = p + stack_frame_sizeW((StgClosure *)p);
3968         switch (info->i.type) {
3969             
3970         case UPDATE_FRAME:
3971             // Only create raise_closure if we need to.
3972             if (raise_closure == NULL) {
3973                 raise_closure = 
3974                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3975                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3976                 raise_closure->payload[0] = exception;
3977             }
3978             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3979             p = next;
3980             continue;
3981
3982         case ATOMICALLY_FRAME:
3983             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3984             tso->sp = p;
3985             return ATOMICALLY_FRAME;
3986             
3987         case CATCH_FRAME:
3988             tso->sp = p;
3989             return CATCH_FRAME;
3990
3991         case CATCH_STM_FRAME:
3992             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3993             tso->sp = p;
3994             return CATCH_STM_FRAME;
3995             
3996         case STOP_FRAME:
3997             tso->sp = p;
3998             return STOP_FRAME;
3999
4000         case CATCH_RETRY_FRAME:
4001         default:
4002             p = next; 
4003             continue;
4004         }
4005     }
4006 }
4007
4008
4009 /* -----------------------------------------------------------------------------
4010    findRetryFrameHelper
4011
4012    This function is called by the retry# primitive.  It traverses the stack
4013    leaving tso->sp referring to the frame which should handle the retry.  
4014
4015    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4016    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4017
4018    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4019    despite the similar implementation.
4020
4021    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4022    not be created within memory transactions.
4023    -------------------------------------------------------------------------- */
4024
4025 StgWord
4026 findRetryFrameHelper (StgTSO *tso)
4027 {
4028   StgPtr           p, next;
4029   StgRetInfoTable *info;
4030
4031   p = tso -> sp;
4032   while (1) {
4033     info = get_ret_itbl((StgClosure *)p);
4034     next = p + stack_frame_sizeW((StgClosure *)p);
4035     switch (info->i.type) {
4036       
4037     case ATOMICALLY_FRAME:
4038       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4039       tso->sp = p;
4040       return ATOMICALLY_FRAME;
4041       
4042     case CATCH_RETRY_FRAME:
4043       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4044       tso->sp = p;
4045       return CATCH_RETRY_FRAME;
4046       
4047     case CATCH_STM_FRAME:
4048     default:
4049       ASSERT(info->i.type != CATCH_FRAME);
4050       ASSERT(info->i.type != STOP_FRAME);
4051       p = next; 
4052       continue;
4053     }
4054   }
4055 }
4056
4057 /* -----------------------------------------------------------------------------
4058    resurrectThreads is called after garbage collection on the list of
4059    threads found to be garbage.  Each of these threads will be woken
4060    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4061    on an MVar, or NonTermination if the thread was blocked on a Black
4062    Hole.
4063
4064    Locks: assumes we hold *all* the capabilities.
4065    -------------------------------------------------------------------------- */
4066
4067 void
4068 resurrectThreads (StgTSO *threads)
4069 {
4070     StgTSO *tso, *next;
4071     Capability *cap;
4072
4073     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4074         next = tso->global_link;
4075         tso->global_link = all_threads;
4076         all_threads = tso;
4077         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4078         
4079         // Wake up the thread on the Capability it was last on for a
4080         // bound thread, or last_free_capability otherwise.
4081         if (tso->bound) {
4082             cap = tso->bound->cap;
4083         } else {
4084             cap = last_free_capability;
4085         }
4086         
4087         switch (tso->why_blocked) {
4088         case BlockedOnMVar:
4089         case BlockedOnException:
4090             /* Called by GC - sched_mutex lock is currently held. */
4091             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4092             break;
4093         case BlockedOnBlackHole:
4094             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4095             break;
4096         case BlockedOnSTM:
4097             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4098             break;
4099         case NotBlocked:
4100             /* This might happen if the thread was blocked on a black hole
4101              * belonging to a thread that we've just woken up (raiseAsync
4102              * can wake up threads, remember...).
4103              */
4104             continue;
4105         default:
4106             barf("resurrectThreads: thread blocked in a strange way");
4107         }
4108     }
4109 }
4110
4111 /* ----------------------------------------------------------------------------
4112  * Debugging: why is a thread blocked
4113  * [Also provides useful information when debugging threaded programs
4114  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4115    ------------------------------------------------------------------------- */
4116
4117 #if DEBUG
4118 static void
4119 printThreadBlockage(StgTSO *tso)
4120 {
4121   switch (tso->why_blocked) {
4122   case BlockedOnRead:
4123     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4124     break;
4125   case BlockedOnWrite:
4126     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4127     break;
4128 #if defined(mingw32_HOST_OS)
4129     case BlockedOnDoProc:
4130     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4131     break;
4132 #endif
4133   case BlockedOnDelay:
4134     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4135     break;
4136   case BlockedOnMVar:
4137     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4138     break;
4139   case BlockedOnException:
4140     debugBelch("is blocked on delivering an exception to thread %d",
4141             tso->block_info.tso->id);
4142     break;
4143   case BlockedOnBlackHole:
4144     debugBelch("is blocked on a black hole");
4145     break;
4146   case NotBlocked:
4147     debugBelch("is not blocked");
4148     break;
4149 #if defined(PARALLEL_HASKELL)
4150   case BlockedOnGA:
4151     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4152             tso->block_info.closure, info_type(tso->block_info.closure));
4153     break;
4154   case BlockedOnGA_NoSend:
4155     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4156             tso->block_info.closure, info_type(tso->block_info.closure));
4157     break;
4158 #endif
4159   case BlockedOnCCall:
4160     debugBelch("is blocked on an external call");
4161     break;
4162   case BlockedOnCCall_NoUnblockExc:
4163     debugBelch("is blocked on an external call (exceptions were already blocked)");
4164     break;
4165   case BlockedOnSTM:
4166     debugBelch("is blocked on an STM operation");
4167     break;
4168   default:
4169     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4170          tso->why_blocked, tso->id, tso);
4171   }
4172 }
4173
4174 void
4175 printThreadStatus(StgTSO *t)
4176 {
4177     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4178     {
4179       void *label = lookupThreadLabel(t->id);
4180       if (label) debugBelch("[\"%s\"] ",(char *)label);
4181     }
4182     if (t->what_next == ThreadRelocated) {
4183         debugBelch("has been relocated...\n");
4184     } else {
4185         switch (t->what_next) {
4186         case ThreadKilled:
4187             debugBelch("has been killed");
4188             break;
4189         case ThreadComplete:
4190             debugBelch("has completed");
4191             break;
4192         default:
4193             printThreadBlockage(t);
4194         }
4195         debugBelch("\n");
4196     }
4197 }
4198
4199 void
4200 printAllThreads(void)
4201 {
4202   StgTSO *t, *next;
4203   nat i;
4204   Capability *cap;
4205
4206 # if defined(GRAN)
4207   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4208   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4209                        time_string, rtsFalse/*no commas!*/);
4210
4211   debugBelch("all threads at [%s]:\n", time_string);
4212 # elif defined(PARALLEL_HASKELL)
4213   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4214   ullong_format_string(CURRENT_TIME,
4215                        time_string, rtsFalse/*no commas!*/);
4216
4217   debugBelch("all threads at [%s]:\n", time_string);
4218 # else
4219   debugBelch("all threads:\n");
4220 # endif
4221
4222   for (i = 0; i < n_capabilities; i++) {
4223       cap = &capabilities[i];
4224       debugBelch("threads on capability %d:\n", cap->no);
4225       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4226           printThreadStatus(t);
4227       }
4228   }
4229
4230   debugBelch("other threads:\n");
4231   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4232       if (t->why_blocked != NotBlocked) {
4233           printThreadStatus(t);
4234       }
4235       if (t->what_next == ThreadRelocated) {
4236           next = t->link;
4237       } else {
4238           next = t->global_link;
4239       }
4240   }
4241 }
4242
4243 // useful from gdb
4244 void 
4245 printThreadQueue(StgTSO *t)
4246 {
4247     nat i = 0;
4248     for (; t != END_TSO_QUEUE; t = t->link) {
4249         printThreadStatus(t);
4250         i++;
4251     }
4252     debugBelch("%d threads on queue\n", i);
4253 }
4254
4255 /* 
4256    Print a whole blocking queue attached to node (debugging only).
4257 */
4258 # if defined(PARALLEL_HASKELL)
4259 void 
4260 print_bq (StgClosure *node)
4261 {
4262   StgBlockingQueueElement *bqe;
4263   StgTSO *tso;
4264   rtsBool end;
4265
4266   debugBelch("## BQ of closure %p (%s): ",
4267           node, info_type(node));
4268
4269   /* should cover all closures that may have a blocking queue */
4270   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4271          get_itbl(node)->type == FETCH_ME_BQ ||
4272          get_itbl(node)->type == RBH ||
4273          get_itbl(node)->type == MVAR);
4274     
4275   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4276
4277   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4278 }
4279
4280 /* 
4281    Print a whole blocking queue starting with the element bqe.
4282 */
4283 void 
4284 print_bqe (StgBlockingQueueElement *bqe)
4285 {
4286   rtsBool end;
4287
4288   /* 
4289      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4290   */
4291   for (end = (bqe==END_BQ_QUEUE);
4292        !end; // iterate until bqe points to a CONSTR
4293        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4294        bqe = end ? END_BQ_QUEUE : bqe->link) {
4295     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4296     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4297     /* types of closures that may appear in a blocking queue */
4298     ASSERT(get_itbl(bqe)->type == TSO ||           
4299            get_itbl(bqe)->type == BLOCKED_FETCH || 
4300            get_itbl(bqe)->type == CONSTR); 
4301     /* only BQs of an RBH end with an RBH_Save closure */
4302     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4303
4304     switch (get_itbl(bqe)->type) {
4305     case TSO:
4306       debugBelch(" TSO %u (%x),",
4307               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4308       break;
4309     case BLOCKED_FETCH:
4310       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4311               ((StgBlockedFetch *)bqe)->node, 
4312               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4313               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4314               ((StgBlockedFetch *)bqe)->ga.weight);
4315       break;
4316     case CONSTR:
4317       debugBelch(" %s (IP %p),",
4318               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4319                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4320                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4321                "RBH_Save_?"), get_itbl(bqe));
4322       break;
4323     default:
4324       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4325            info_type((StgClosure *)bqe)); // , node, info_type(node));
4326       break;
4327     }
4328   } /* for */
4329   debugBelch("\n");
4330 }
4331 # elif defined(GRAN)
4332 void 
4333 print_bq (StgClosure *node)
4334 {
4335   StgBlockingQueueElement *bqe;
4336   PEs node_loc, tso_loc;
4337   rtsBool end;
4338
4339   /* should cover all closures that may have a blocking queue */
4340   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4341          get_itbl(node)->type == FETCH_ME_BQ ||
4342          get_itbl(node)->type == RBH);
4343     
4344   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4345   node_loc = where_is(node);
4346
4347   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4348           node, info_type(node), node_loc);
4349
4350   /* 
4351      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4352   */
4353   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4354        !end; // iterate until bqe points to a CONSTR
4355        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4356     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4357     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4358     /* types of closures that may appear in a blocking queue */
4359     ASSERT(get_itbl(bqe)->type == TSO ||           
4360            get_itbl(bqe)->type == CONSTR); 
4361     /* only BQs of an RBH end with an RBH_Save closure */
4362     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4363
4364     tso_loc = where_is((StgClosure *)bqe);
4365     switch (get_itbl(bqe)->type) {
4366     case TSO:
4367       debugBelch(" TSO %d (%p) on [PE %d],",
4368               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4369       break;
4370     case CONSTR:
4371       debugBelch(" %s (IP %p),",
4372               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4373                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4374                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4375                "RBH_Save_?"), get_itbl(bqe));
4376       break;
4377     default:
4378       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4379            info_type((StgClosure *)bqe), node, info_type(node));
4380       break;
4381     }
4382   } /* for */
4383   debugBelch("\n");
4384 }
4385 # endif
4386
4387 #if defined(PARALLEL_HASKELL)
4388 static nat
4389 run_queue_len(void)
4390 {
4391     nat i;
4392     StgTSO *tso;
4393     
4394     for (i=0, tso=run_queue_hd; 
4395          tso != END_TSO_QUEUE;
4396          i++, tso=tso->link) {
4397         /* nothing */
4398     }
4399         
4400     return i;
4401 }
4402 #endif
4403
4404 void
4405 sched_belch(char *s, ...)
4406 {
4407     va_list ap;
4408     va_start(ap,s);
4409 #ifdef THREADED_RTS
4410     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4411 #elif defined(PARALLEL_HASKELL)
4412     debugBelch("== ");
4413 #else
4414     debugBelch("sched: ");
4415 #endif
4416     vdebugBelch(s, ap);
4417     debugBelch("\n");
4418     va_end(ap);
4419 }
4420
4421 #endif /* DEBUG */