add another SMP assertion
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
231                          void (*get_roots)(evac_fn));
232
233 static void unblockThread(Capability *cap, StgTSO *tso);
234 static rtsBool checkBlackHoles(Capability *cap);
235 static void AllRoots(evac_fn evac);
236
237 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238
239 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
240                         rtsBool stop_at_atomically, StgPtr stop_here);
241
242 static void deleteThread (Capability *cap, StgTSO *tso);
243 static void deleteRunQueue (Capability *cap);
244
245 #ifdef DEBUG
246 static void printThreadBlockage(StgTSO *tso);
247 static void printThreadStatus(StgTSO *tso);
248 void printThreadQueue(StgTSO *tso);
249 #endif
250
251 #if defined(PARALLEL_HASKELL)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 #ifdef DEBUG
257 static char *whatNext_strs[] = {
258   "(unknown)",
259   "ThreadRunGHC",
260   "ThreadInterpret",
261   "ThreadKilled",
262   "ThreadRelocated",
263   "ThreadComplete"
264 };
265 #endif
266
267 /* -----------------------------------------------------------------------------
268  * Putting a thread on the run queue: different scheduling policies
269  * -------------------------------------------------------------------------- */
270
271 STATIC_INLINE void
272 addToRunQueue( Capability *cap, StgTSO *t )
273 {
274 #if defined(PARALLEL_HASKELL)
275     if (RtsFlags.ParFlags.doFairScheduling) { 
276         // this does round-robin scheduling; good for concurrency
277         appendToRunQueue(cap,t);
278     } else {
279         // this does unfair scheduling; good for parallelism
280         pushOnRunQueue(cap,t);
281     }
282 #else
283     // this does round-robin scheduling; good for concurrency
284     appendToRunQueue(cap,t);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    GRAN version:
301      In a GranSim setup this loop iterates over the global event queue.
302      This revolves around the global event queue, which determines what 
303      to do next. Therefore, it's more complicated than either the 
304      concurrent or the parallel (GUM) setup.
305
306    GUM version:
307      GUM iterates over incoming messages.
308      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
309      and sends out a fish whenever it has nothing to do; in-between
310      doing the actual reductions (shared code below) it processes the
311      incoming messages and deals with delayed operations 
312      (see PendingFetches).
313      This is not the ugliest code you could imagine, but it's bloody close.
314
315    ------------------------------------------------------------------------ */
316
317 static Capability *
318 schedule (Capability *initialCapability, Task *task)
319 {
320   StgTSO *t;
321   Capability *cap;
322   StgThreadReturnCode ret;
323 #if defined(GRAN)
324   rtsEvent *event;
325 #elif defined(PARALLEL_HASKELL)
326   StgTSO *tso;
327   GlobalTaskId pe;
328   rtsBool receivedFinish = rtsFalse;
329 # if defined(DEBUG)
330   nat tp_size, sp_size; // stats only
331 # endif
332 #endif
333   nat prev_what_next;
334   rtsBool ready_to_gc;
335 #if defined(THREADED_RTS)
336   rtsBool first = rtsTrue;
337 #endif
338   
339   cap = initialCapability;
340
341   // Pre-condition: this task owns initialCapability.
342   // The sched_mutex is *NOT* held
343   // NB. on return, we still hold a capability.
344
345   IF_DEBUG(scheduler,
346            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
347                        task, initialCapability);
348       );
349
350   schedulePreLoop();
351
352   // -----------------------------------------------------------
353   // Scheduler loop starts here:
354
355 #if defined(PARALLEL_HASKELL)
356 #define TERMINATION_CONDITION        (!receivedFinish)
357 #elif defined(GRAN)
358 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
359 #else
360 #define TERMINATION_CONDITION        rtsTrue
361 #endif
362
363   while (TERMINATION_CONDITION) {
364
365 #if defined(GRAN)
366       /* Choose the processor with the next event */
367       CurrentProc = event->proc;
368       CurrentTSO = event->tso;
369 #endif
370
371 #if defined(THREADED_RTS)
372       if (first) {
373           // don't yield the first time, we want a chance to run this
374           // thread for a bit, even if there are others banging at the
375           // door.
376           first = rtsFalse;
377           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378       } else {
379           // Yield the capability to higher-priority tasks if necessary.
380           yieldCapability(&cap, task);
381       }
382 #endif
383       
384 #if defined(THREADED_RTS)
385       schedulePushWork(cap,task);
386 #endif
387
388     // Check whether we have re-entered the RTS from Haskell without
389     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390     // call).
391     if (cap->in_haskell) {
392           errorBelch("schedule: re-entered unsafely.\n"
393                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
394           stg_exit(EXIT_FAILURE);
395     }
396
397     //
398     // Test for interruption.  If interrupted==rtsTrue, then either
399     // we received a keyboard interrupt (^C), or the scheduler is
400     // trying to shut down all the tasks (shutting_down_scheduler) in
401     // the threaded RTS.
402     //
403     if (interrupted) {
404         deleteRunQueue(cap);
405 #if defined(THREADED_RTS)
406         discardSparksCap(cap);
407 #endif
408         if (shutting_down_scheduler) {
409             IF_DEBUG(scheduler, sched_belch("shutting down"));
410             // If we are a worker, just exit.  If we're a bound thread
411             // then we will exit below when we've removed our TSO from
412             // the run queue.
413             if (task->tso == NULL && emptyRunQueue(cap)) {
414                 return cap;
415             }
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(THREADED_RTS)
422     // If the run queue is empty, take a spark and turn it into a thread.
423     {
424         if (emptyRunQueue(cap)) {
425             StgClosure *spark;
426             spark = findSpark(cap);
427             if (spark != NULL) {
428                 IF_DEBUG(scheduler,
429                          sched_belch("turning spark of closure %p into a thread",
430                                      (StgClosure *)spark));
431                 createSparkThread(cap,spark);     
432             }
433         }
434     }
435 #endif // THREADED_RTS
436
437     scheduleStartSignalHandlers(cap);
438
439     // Only check the black holes here if we've nothing else to do.
440     // During normal execution, the black hole list only gets checked
441     // at GC time, to avoid repeatedly traversing this possibly long
442     // list each time around the scheduler.
443     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444
445     scheduleCheckBlockedThreads(cap);
446
447     scheduleDetectDeadlock(cap,task);
448 #if defined(THREADED_RTS)
449     cap = task->cap;    // reload cap, it might have changed
450 #endif
451
452     // Normally, the only way we can get here with no threads to
453     // run is if a keyboard interrupt received during 
454     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
455     // Additionally, it is not fatal for the
456     // threaded RTS to reach here with no threads to run.
457     //
458     // win32: might be here due to awaitEvent() being abandoned
459     // as a result of a console event having been delivered.
460     if ( emptyRunQueue(cap) ) {
461 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
462         ASSERT(interrupted);
463 #endif
464         continue; // nothing to do
465     }
466
467 #if defined(PARALLEL_HASKELL)
468     scheduleSendPendingMessages();
469     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
470         continue;
471
472 #if defined(SPARKS)
473     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
474 #endif
475
476     /* If we still have no work we need to send a FISH to get a spark
477        from another PE */
478     if (emptyRunQueue(cap)) {
479         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
480         ASSERT(rtsFalse); // should not happen at the moment
481     }
482     // from here: non-empty run queue.
483     //  TODO: merge above case with this, only one call processMessages() !
484     if (PacketsWaiting()) {  /* process incoming messages, if
485                                 any pending...  only in else
486                                 because getRemoteWork waits for
487                                 messages as well */
488         receivedFinish = processMessages();
489     }
490 #endif
491
492 #if defined(GRAN)
493     scheduleProcessEvent(event);
494 #endif
495
496     // 
497     // Get a thread to run
498     //
499     t = popRunQueue(cap);
500
501 #if defined(GRAN) || defined(PAR)
502     scheduleGranParReport(); // some kind of debuging output
503 #else
504     // Sanity check the thread we're about to run.  This can be
505     // expensive if there is lots of thread switching going on...
506     IF_DEBUG(sanity,checkTSO(t));
507 #endif
508
509 #if defined(THREADED_RTS)
510     // Check whether we can run this thread in the current task.
511     // If not, we have to pass our capability to the right task.
512     {
513         Task *bound = t->bound;
514       
515         if (bound) {
516             if (bound == task) {
517                 IF_DEBUG(scheduler,
518                          sched_belch("### Running thread %d in bound thread",
519                                      t->id));
520                 // yes, the Haskell thread is bound to the current native thread
521             } else {
522                 IF_DEBUG(scheduler,
523                          sched_belch("### thread %d bound to another OS thread",
524                                      t->id));
525                 // no, bound to a different Haskell thread: pass to that thread
526                 pushOnRunQueue(cap,t);
527                 continue;
528             }
529         } else {
530             // The thread we want to run is unbound.
531             if (task->tso) { 
532                 IF_DEBUG(scheduler,
533                          sched_belch("### this OS thread cannot run thread %d", t->id));
534                 // no, the current native thread is bound to a different
535                 // Haskell thread, so pass it to any worker thread
536                 pushOnRunQueue(cap,t);
537                 continue; 
538             }
539         }
540     }
541 #endif
542
543     cap->r.rCurrentTSO = t;
544     
545     /* context switches are initiated by the timer signal, unless
546      * the user specified "context switch as often as possible", with
547      * +RTS -C0
548      */
549     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
550         && !emptyThreadQueues(cap)) {
551         context_switch = 1;
552     }
553          
554 run_thread:
555
556     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
557                               (long)t->id, whatNext_strs[t->what_next]));
558
559 #if defined(PROFILING)
560     startHeapProfTimer();
561 #endif
562
563     // ----------------------------------------------------------------------
564     // Run the current thread 
565
566     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
567
568     prev_what_next = t->what_next;
569
570     errno = t->saved_errno;
571     cap->in_haskell = rtsTrue;
572
573     dirtyTSO(t);
574
575     recent_activity = ACTIVITY_YES;
576
577     switch (prev_what_next) {
578         
579     case ThreadKilled:
580     case ThreadComplete:
581         /* Thread already finished, return to scheduler. */
582         ret = ThreadFinished;
583         break;
584         
585     case ThreadRunGHC:
586     {
587         StgRegTable *r;
588         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
589         cap = regTableToCapability(r);
590         ret = r->rRet;
591         break;
592     }
593     
594     case ThreadInterpret:
595         cap = interpretBCO(cap);
596         ret = cap->r.rRet;
597         break;
598         
599     default:
600         barf("schedule: invalid what_next field");
601     }
602
603     cap->in_haskell = rtsFalse;
604
605     // The TSO might have moved, eg. if it re-entered the RTS and a GC
606     // happened.  So find the new location:
607     t = cap->r.rCurrentTSO;
608
609     // We have run some Haskell code: there might be blackhole-blocked
610     // threads to wake up now.
611     // Lock-free test here should be ok, we're just setting a flag.
612     if ( blackhole_queue != END_TSO_QUEUE ) {
613         blackholes_need_checking = rtsTrue;
614     }
615     
616     // And save the current errno in this thread.
617     // XXX: possibly bogus for SMP because this thread might already
618     // be running again, see code below.
619     t->saved_errno = errno;
620
621 #if defined(THREADED_RTS)
622     // If ret is ThreadBlocked, and this Task is bound to the TSO that
623     // blocked, we are in limbo - the TSO is now owned by whatever it
624     // is blocked on, and may in fact already have been woken up,
625     // perhaps even on a different Capability.  It may be the case
626     // that task->cap != cap.  We better yield this Capability
627     // immediately and return to normaility.
628     if (ret == ThreadBlocked) {
629         IF_DEBUG(scheduler,
630                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
631                              t->id, whatNext_strs[t->what_next]));
632         continue;
633     }
634 #endif
635
636     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
637
638     // ----------------------------------------------------------------------
639     
640     // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
642     stopHeapProfTimer();
643     CCCS = CCS_SYSTEM;
644 #endif
645     
646 #if defined(THREADED_RTS)
647     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
648 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
649     IF_DEBUG(scheduler,debugBelch("sched: "););
650 #endif
651     
652     schedulePostRunThread();
653
654     ready_to_gc = rtsFalse;
655
656     switch (ret) {
657     case HeapOverflow:
658         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
659         break;
660
661     case StackOverflow:
662         scheduleHandleStackOverflow(cap,task,t);
663         break;
664
665     case ThreadYielding:
666         if (scheduleHandleYield(cap, t, prev_what_next)) {
667             // shortcut for switching between compiler/interpreter:
668             goto run_thread; 
669         }
670         break;
671
672     case ThreadBlocked:
673         scheduleHandleThreadBlocked(t);
674         break;
675
676     case ThreadFinished:
677         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
678         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
679         break;
680
681     default:
682       barf("schedule: invalid thread return code %d", (int)ret);
683     }
684
685     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
686     if (ready_to_gc) {
687       scheduleDoGC(cap,task,rtsFalse,GetRoots);
688 #if defined(THREADED_RTS)
689       cap = task->cap;    // reload cap, it might have changed  
690 #endif
691     }
692   } /* end of while() */
693
694   IF_PAR_DEBUG(verbose,
695                debugBelch("== Leaving schedule() after having received Finish\n"));
696 }
697
698 /* ----------------------------------------------------------------------------
699  * Setting up the scheduler loop
700  * ------------------------------------------------------------------------- */
701
702 static void
703 schedulePreLoop(void)
704 {
705 #if defined(GRAN) 
706     /* set up first event to get things going */
707     /* ToDo: assign costs for system setup and init MainTSO ! */
708     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
709               ContinueThread, 
710               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
711     
712     IF_DEBUG(gran,
713              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
714                         CurrentTSO);
715              G_TSO(CurrentTSO, 5));
716     
717     if (RtsFlags.GranFlags.Light) {
718         /* Save current time; GranSim Light only */
719         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
720     }      
721 #endif
722 }
723
724 /* -----------------------------------------------------------------------------
725  * schedulePushWork()
726  *
727  * Push work to other Capabilities if we have some.
728  * -------------------------------------------------------------------------- */
729
730 #if defined(THREADED_RTS)
731 static void
732 schedulePushWork(Capability *cap USED_IF_THREADS, 
733                  Task *task      USED_IF_THREADS)
734 {
735     Capability *free_caps[n_capabilities], *cap0;
736     nat i, n_free_caps;
737
738     // Check whether we have more threads on our run queue, or sparks
739     // in our pool, that we could hand to another Capability.
740     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
741         && sparkPoolSizeCap(cap) < 2) {
742         return;
743     }
744
745     // First grab as many free Capabilities as we can.
746     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
747         cap0 = &capabilities[i];
748         if (cap != cap0 && tryGrabCapability(cap0,task)) {
749             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
750                 // it already has some work, we just grabbed it at 
751                 // the wrong moment.  Or maybe it's deadlocked!
752                 releaseCapability(cap0);
753             } else {
754                 free_caps[n_free_caps++] = cap0;
755             }
756         }
757     }
758
759     // we now have n_free_caps free capabilities stashed in
760     // free_caps[].  Share our run queue equally with them.  This is
761     // probably the simplest thing we could do; improvements we might
762     // want to do include:
763     //
764     //   - giving high priority to moving relatively new threads, on 
765     //     the gournds that they haven't had time to build up a
766     //     working set in the cache on this CPU/Capability.
767     //
768     //   - giving low priority to moving long-lived threads
769
770     if (n_free_caps > 0) {
771         StgTSO *prev, *t, *next;
772         rtsBool pushed_to_all;
773
774         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
775
776         i = 0;
777         pushed_to_all = rtsFalse;
778
779         if (cap->run_queue_hd != END_TSO_QUEUE) {
780             prev = cap->run_queue_hd;
781             t = prev->link;
782             prev->link = END_TSO_QUEUE;
783             for (; t != END_TSO_QUEUE; t = next) {
784                 next = t->link;
785                 t->link = END_TSO_QUEUE;
786                 if (t->what_next == ThreadRelocated
787                     || t->bound == task) { // don't move my bound thread
788                     prev->link = t;
789                     prev = t;
790                 } else if (i == n_free_caps) {
791                     pushed_to_all = rtsTrue;
792                     i = 0;
793                     // keep one for us
794                     prev->link = t;
795                     prev = t;
796                 } else {
797                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
798                     appendToRunQueue(free_caps[i],t);
799                     if (t->bound) { t->bound->cap = free_caps[i]; }
800                     i++;
801                 }
802             }
803             cap->run_queue_tl = prev;
804         }
805
806         // If there are some free capabilities that we didn't push any
807         // threads to, then try to push a spark to each one.
808         if (!pushed_to_all) {
809             StgClosure *spark;
810             // i is the next free capability to push to
811             for (; i < n_free_caps; i++) {
812                 if (emptySparkPoolCap(free_caps[i])) {
813                     spark = findSpark(cap);
814                     if (spark != NULL) {
815                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
816                         newSpark(&(free_caps[i]->r), spark);
817                     }
818                 }
819             }
820         }
821
822         // release the capabilities
823         for (i = 0; i < n_free_caps; i++) {
824             task->cap = free_caps[i];
825             releaseCapability(free_caps[i]);
826         }
827     }
828     task->cap = cap; // reset to point to our Capability.
829 }
830 #endif
831
832 /* ----------------------------------------------------------------------------
833  * Start any pending signal handlers
834  * ------------------------------------------------------------------------- */
835
836 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
837 static void
838 scheduleStartSignalHandlers(Capability *cap)
839 {
840     if (signals_pending()) { // safe outside the lock
841         startSignalHandlers(cap);
842     }
843 }
844 #else
845 static void
846 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
847 {
848 }
849 #endif
850
851 /* ----------------------------------------------------------------------------
852  * Check for blocked threads that can be woken up.
853  * ------------------------------------------------------------------------- */
854
855 static void
856 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
857 {
858 #if !defined(THREADED_RTS)
859     //
860     // Check whether any waiting threads need to be woken up.  If the
861     // run queue is empty, and there are no other tasks running, we
862     // can wait indefinitely for something to happen.
863     //
864     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
865     {
866         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
867     }
868 #endif
869 }
870
871
872 /* ----------------------------------------------------------------------------
873  * Check for threads blocked on BLACKHOLEs that can be woken up
874  * ------------------------------------------------------------------------- */
875 static void
876 scheduleCheckBlackHoles (Capability *cap)
877 {
878     if ( blackholes_need_checking ) // check without the lock first
879     {
880         ACQUIRE_LOCK(&sched_mutex);
881         if ( blackholes_need_checking ) {
882             checkBlackHoles(cap);
883             blackholes_need_checking = rtsFalse;
884         }
885         RELEASE_LOCK(&sched_mutex);
886     }
887 }
888
889 /* ----------------------------------------------------------------------------
890  * Detect deadlock conditions and attempt to resolve them.
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability *cap, Task *task)
895 {
896
897 #if defined(PARALLEL_HASKELL)
898     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
899     return;
900 #endif
901
902     /* 
903      * Detect deadlock: when we have no threads to run, there are no
904      * threads blocked, waiting for I/O, or sleeping, and all the
905      * other tasks are waiting for work, we must have a deadlock of
906      * some description.
907      */
908     if ( emptyThreadQueues(cap) )
909     {
910 #if defined(THREADED_RTS)
911         /* 
912          * In the threaded RTS, we only check for deadlock if there
913          * has been no activity in a complete timeslice.  This means
914          * we won't eagerly start a full GC just because we don't have
915          * any threads to run currently.
916          */
917         if (recent_activity != ACTIVITY_INACTIVE) return;
918 #endif
919
920         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
921
922         // Garbage collection can release some new threads due to
923         // either (a) finalizers or (b) threads resurrected because
924         // they are unreachable and will therefore be sent an
925         // exception.  Any threads thus released will be immediately
926         // runnable.
927         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
928 #if defined(THREADED_RTS)
929         cap = task->cap;    // reload cap, it might have changed
930 #endif
931
932         recent_activity = ACTIVITY_DONE_GC;
933         
934         if ( !emptyRunQueue(cap) ) return;
935
936 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
937         /* If we have user-installed signal handlers, then wait
938          * for signals to arrive rather then bombing out with a
939          * deadlock.
940          */
941         if ( anyUserHandlers() ) {
942             IF_DEBUG(scheduler, 
943                      sched_belch("still deadlocked, waiting for signals..."));
944
945             awaitUserSignals();
946
947             if (signals_pending()) {
948                 startSignalHandlers(cap);
949             }
950
951             // either we have threads to run, or we were interrupted:
952             ASSERT(!emptyRunQueue(cap) || interrupted);
953         }
954 #endif
955
956 #if !defined(THREADED_RTS)
957         /* Probably a real deadlock.  Send the current main thread the
958          * Deadlock exception.
959          */
960         if (task->tso) {
961             switch (task->tso->why_blocked) {
962             case BlockedOnSTM:
963             case BlockedOnBlackHole:
964             case BlockedOnException:
965             case BlockedOnMVar:
966                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
967                 return;
968             default:
969                 barf("deadlock: main thread blocked in a strange way");
970             }
971         }
972         return;
973 #endif
974     }
975 }
976
977 /* ----------------------------------------------------------------------------
978  * Process an event (GRAN only)
979  * ------------------------------------------------------------------------- */
980
981 #if defined(GRAN)
982 static StgTSO *
983 scheduleProcessEvent(rtsEvent *event)
984 {
985     StgTSO *t;
986
987     if (RtsFlags.GranFlags.Light)
988       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
989
990     /* adjust time based on time-stamp */
991     if (event->time > CurrentTime[CurrentProc] &&
992         event->evttype != ContinueThread)
993       CurrentTime[CurrentProc] = event->time;
994     
995     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
996     if (!RtsFlags.GranFlags.Light)
997       handleIdlePEs();
998
999     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1000
1001     /* main event dispatcher in GranSim */
1002     switch (event->evttype) {
1003       /* Should just be continuing execution */
1004     case ContinueThread:
1005       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1006       /* ToDo: check assertion
1007       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1008              run_queue_hd != END_TSO_QUEUE);
1009       */
1010       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1011       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1012           procStatus[CurrentProc]==Fetching) {
1013         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1014               CurrentTSO->id, CurrentTSO, CurrentProc);
1015         goto next_thread;
1016       } 
1017       /* Ignore ContinueThreads for completed threads */
1018       if (CurrentTSO->what_next == ThreadComplete) {
1019         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1020               CurrentTSO->id, CurrentTSO, CurrentProc);
1021         goto next_thread;
1022       } 
1023       /* Ignore ContinueThreads for threads that are being migrated */
1024       if (PROCS(CurrentTSO)==Nowhere) { 
1025         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1026               CurrentTSO->id, CurrentTSO, CurrentProc);
1027         goto next_thread;
1028       }
1029       /* The thread should be at the beginning of the run queue */
1030       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1031         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1032               CurrentTSO->id, CurrentTSO, CurrentProc);
1033         break; // run the thread anyway
1034       }
1035       /*
1036       new_event(proc, proc, CurrentTime[proc],
1037                 FindWork,
1038                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1039       goto next_thread; 
1040       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1041       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1042
1043     case FetchNode:
1044       do_the_fetchnode(event);
1045       goto next_thread;             /* handle next event in event queue  */
1046       
1047     case GlobalBlock:
1048       do_the_globalblock(event);
1049       goto next_thread;             /* handle next event in event queue  */
1050       
1051     case FetchReply:
1052       do_the_fetchreply(event);
1053       goto next_thread;             /* handle next event in event queue  */
1054       
1055     case UnblockThread:   /* Move from the blocked queue to the tail of */
1056       do_the_unblock(event);
1057       goto next_thread;             /* handle next event in event queue  */
1058       
1059     case ResumeThread:  /* Move from the blocked queue to the tail of */
1060       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1061       event->tso->gran.blocktime += 
1062         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1063       do_the_startthread(event);
1064       goto next_thread;             /* handle next event in event queue  */
1065       
1066     case StartThread:
1067       do_the_startthread(event);
1068       goto next_thread;             /* handle next event in event queue  */
1069       
1070     case MoveThread:
1071       do_the_movethread(event);
1072       goto next_thread;             /* handle next event in event queue  */
1073       
1074     case MoveSpark:
1075       do_the_movespark(event);
1076       goto next_thread;             /* handle next event in event queue  */
1077       
1078     case FindWork:
1079       do_the_findwork(event);
1080       goto next_thread;             /* handle next event in event queue  */
1081       
1082     default:
1083       barf("Illegal event type %u\n", event->evttype);
1084     }  /* switch */
1085     
1086     /* This point was scheduler_loop in the old RTS */
1087
1088     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1089
1090     TimeOfLastEvent = CurrentTime[CurrentProc];
1091     TimeOfNextEvent = get_time_of_next_event();
1092     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1093     // CurrentTSO = ThreadQueueHd;
1094
1095     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1096                          TimeOfNextEvent));
1097
1098     if (RtsFlags.GranFlags.Light) 
1099       GranSimLight_leave_system(event, &ActiveTSO); 
1100
1101     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1102
1103     IF_DEBUG(gran, 
1104              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1105
1106     /* in a GranSim setup the TSO stays on the run queue */
1107     t = CurrentTSO;
1108     /* Take a thread from the run queue. */
1109     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1110
1111     IF_DEBUG(gran, 
1112              debugBelch("GRAN: About to run current thread, which is\n");
1113              G_TSO(t,5));
1114
1115     context_switch = 0; // turned on via GranYield, checking events and time slice
1116
1117     IF_DEBUG(gran, 
1118              DumpGranEvent(GR_SCHEDULE, t));
1119
1120     procStatus[CurrentProc] = Busy;
1121 }
1122 #endif // GRAN
1123
1124 /* ----------------------------------------------------------------------------
1125  * Send pending messages (PARALLEL_HASKELL only)
1126  * ------------------------------------------------------------------------- */
1127
1128 #if defined(PARALLEL_HASKELL)
1129 static StgTSO *
1130 scheduleSendPendingMessages(void)
1131 {
1132     StgSparkPool *pool;
1133     rtsSpark spark;
1134     StgTSO *t;
1135
1136 # if defined(PAR) // global Mem.Mgmt., omit for now
1137     if (PendingFetches != END_BF_QUEUE) {
1138         processFetches();
1139     }
1140 # endif
1141     
1142     if (RtsFlags.ParFlags.BufferTime) {
1143         // if we use message buffering, we must send away all message
1144         // packets which have become too old...
1145         sendOldBuffers(); 
1146     }
1147 }
1148 #endif
1149
1150 /* ----------------------------------------------------------------------------
1151  * Activate spark threads (PARALLEL_HASKELL only)
1152  * ------------------------------------------------------------------------- */
1153
1154 #if defined(PARALLEL_HASKELL)
1155 static void
1156 scheduleActivateSpark(void)
1157 {
1158 #if defined(SPARKS)
1159   ASSERT(emptyRunQueue());
1160 /* We get here if the run queue is empty and want some work.
1161    We try to turn a spark into a thread, and add it to the run queue,
1162    from where it will be picked up in the next iteration of the scheduler
1163    loop.
1164 */
1165
1166       /* :-[  no local threads => look out for local sparks */
1167       /* the spark pool for the current PE */
1168       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1169       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1170           pool->hd < pool->tl) {
1171         /* 
1172          * ToDo: add GC code check that we really have enough heap afterwards!!
1173          * Old comment:
1174          * If we're here (no runnable threads) and we have pending
1175          * sparks, we must have a space problem.  Get enough space
1176          * to turn one of those pending sparks into a
1177          * thread... 
1178          */
1179
1180         spark = findSpark(rtsFalse);            /* get a spark */
1181         if (spark != (rtsSpark) NULL) {
1182           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1183           IF_PAR_DEBUG(fish, // schedule,
1184                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1185                              tso->id, tso, advisory_thread_count));
1186
1187           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1188             IF_PAR_DEBUG(fish, // schedule,
1189                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1190                             spark));
1191             return rtsFalse; /* failed to generate a thread */
1192           }                  /* otherwise fall through & pick-up new tso */
1193         } else {
1194           IF_PAR_DEBUG(fish, // schedule,
1195                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1196                              spark_queue_len(pool)));
1197           return rtsFalse;  /* failed to generate a thread */
1198         }
1199         return rtsTrue;  /* success in generating a thread */
1200   } else { /* no more threads permitted or pool empty */
1201     return rtsFalse;  /* failed to generateThread */
1202   }
1203 #else
1204   tso = NULL; // avoid compiler warning only
1205   return rtsFalse;  /* dummy in non-PAR setup */
1206 #endif // SPARKS
1207 }
1208 #endif // PARALLEL_HASKELL
1209
1210 /* ----------------------------------------------------------------------------
1211  * Get work from a remote node (PARALLEL_HASKELL only)
1212  * ------------------------------------------------------------------------- */
1213     
1214 #if defined(PARALLEL_HASKELL)
1215 static rtsBool
1216 scheduleGetRemoteWork(rtsBool *receivedFinish)
1217 {
1218   ASSERT(emptyRunQueue());
1219
1220   if (RtsFlags.ParFlags.BufferTime) {
1221         IF_PAR_DEBUG(verbose, 
1222                 debugBelch("...send all pending data,"));
1223         {
1224           nat i;
1225           for (i=1; i<=nPEs; i++)
1226             sendImmediately(i); // send all messages away immediately
1227         }
1228   }
1229 # ifndef SPARKS
1230         //++EDEN++ idle() , i.e. send all buffers, wait for work
1231         // suppress fishing in EDEN... just look for incoming messages
1232         // (blocking receive)
1233   IF_PAR_DEBUG(verbose, 
1234                debugBelch("...wait for incoming messages...\n"));
1235   *receivedFinish = processMessages(); // blocking receive...
1236
1237         // and reenter scheduling loop after having received something
1238         // (return rtsFalse below)
1239
1240 # else /* activate SPARKS machinery */
1241 /* We get here, if we have no work, tried to activate a local spark, but still
1242    have no work. We try to get a remote spark, by sending a FISH message.
1243    Thread migration should be added here, and triggered when a sequence of 
1244    fishes returns without work. */
1245         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1246
1247       /* =8-[  no local sparks => look for work on other PEs */
1248         /*
1249          * We really have absolutely no work.  Send out a fish
1250          * (there may be some out there already), and wait for
1251          * something to arrive.  We clearly can't run any threads
1252          * until a SCHEDULE or RESUME arrives, and so that's what
1253          * we're hoping to see.  (Of course, we still have to
1254          * respond to other types of messages.)
1255          */
1256         rtsTime now = msTime() /*CURRENT_TIME*/;
1257         IF_PAR_DEBUG(verbose, 
1258                      debugBelch("--  now=%ld\n", now));
1259         IF_PAR_DEBUG(fish, // verbose,
1260              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1261                  (last_fish_arrived_at!=0 &&
1262                   last_fish_arrived_at+delay > now)) {
1263                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1264                      now, last_fish_arrived_at+delay, 
1265                      last_fish_arrived_at,
1266                      delay);
1267              });
1268   
1269         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1270             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1271           if (last_fish_arrived_at==0 ||
1272               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1273             /* outstandingFishes is set in sendFish, processFish;
1274                avoid flooding system with fishes via delay */
1275     next_fish_to_send_at = 0;  
1276   } else {
1277     /* ToDo: this should be done in the main scheduling loop to avoid the
1278              busy wait here; not so bad if fish delay is very small  */
1279     int iq = 0; // DEBUGGING -- HWL
1280     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1281     /* send a fish when ready, but process messages that arrive in the meantime */
1282     do {
1283       if (PacketsWaiting()) {
1284         iq++; // DEBUGGING
1285         *receivedFinish = processMessages();
1286       }
1287       now = msTime();
1288     } while (!*receivedFinish || now<next_fish_to_send_at);
1289     // JB: This means the fish could become obsolete, if we receive
1290     // work. Better check for work again? 
1291     // last line: while (!receivedFinish || !haveWork || now<...)
1292     // next line: if (receivedFinish || haveWork )
1293
1294     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1295       return rtsFalse;  // NB: this will leave scheduler loop
1296                         // immediately after return!
1297                           
1298     IF_PAR_DEBUG(fish, // verbose,
1299                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1300
1301   }
1302
1303     // JB: IMHO, this should all be hidden inside sendFish(...)
1304     /* pe = choosePE(); 
1305        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1306                 NEW_FISH_HUNGER);
1307
1308     // Global statistics: count no. of fishes
1309     if (RtsFlags.ParFlags.ParStats.Global &&
1310          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1311            globalParStats.tot_fish_mess++;
1312            }
1313     */ 
1314
1315   /* delayed fishes must have been sent by now! */
1316   next_fish_to_send_at = 0;  
1317   }
1318       
1319   *receivedFinish = processMessages();
1320 # endif /* SPARKS */
1321
1322  return rtsFalse;
1323  /* NB: this function always returns rtsFalse, meaning the scheduler
1324     loop continues with the next iteration; 
1325     rationale: 
1326       return code means success in finding work; we enter this function
1327       if there is no local work, thus have to send a fish which takes
1328       time until it arrives with work; in the meantime we should process
1329       messages in the main loop;
1330  */
1331 }
1332 #endif // PARALLEL_HASKELL
1333
1334 /* ----------------------------------------------------------------------------
1335  * PAR/GRAN: Report stats & debugging info(?)
1336  * ------------------------------------------------------------------------- */
1337
1338 #if defined(PAR) || defined(GRAN)
1339 static void
1340 scheduleGranParReport(void)
1341 {
1342   ASSERT(run_queue_hd != END_TSO_QUEUE);
1343
1344   /* Take a thread from the run queue, if we have work */
1345   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1346
1347     /* If this TSO has got its outport closed in the meantime, 
1348      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1349      * It has to be marked as TH_DEAD for this purpose.
1350      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1351
1352 JB: TODO: investigate wether state change field could be nuked
1353      entirely and replaced by the normal tso state (whatnext
1354      field). All we want to do is to kill tsos from outside.
1355      */
1356
1357     /* ToDo: write something to the log-file
1358     if (RTSflags.ParFlags.granSimStats && !sameThread)
1359         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1360
1361     CurrentTSO = t;
1362     */
1363     /* the spark pool for the current PE */
1364     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1365
1366     IF_DEBUG(scheduler, 
1367              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1368                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1369
1370     IF_PAR_DEBUG(fish,
1371              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1372                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1373
1374     if (RtsFlags.ParFlags.ParStats.Full && 
1375         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1376         (emitSchedule || // forced emit
1377          (t && LastTSO && t->id != LastTSO->id))) {
1378       /* 
1379          we are running a different TSO, so write a schedule event to log file
1380          NB: If we use fair scheduling we also have to write  a deschedule 
1381              event for LastTSO; with unfair scheduling we know that the
1382              previous tso has blocked whenever we switch to another tso, so
1383              we don't need it in GUM for now
1384       */
1385       IF_PAR_DEBUG(fish, // schedule,
1386                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1387
1388       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1389                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1390       emitSchedule = rtsFalse;
1391     }
1392 }     
1393 #endif
1394
1395 /* ----------------------------------------------------------------------------
1396  * After running a thread...
1397  * ------------------------------------------------------------------------- */
1398
1399 static void
1400 schedulePostRunThread(void)
1401 {
1402 #if defined(PAR)
1403     /* HACK 675: if the last thread didn't yield, make sure to print a 
1404        SCHEDULE event to the log file when StgRunning the next thread, even
1405        if it is the same one as before */
1406     LastTSO = t; 
1407     TimeOfLastYield = CURRENT_TIME;
1408 #endif
1409
1410   /* some statistics gathering in the parallel case */
1411
1412 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1413   switch (ret) {
1414     case HeapOverflow:
1415 # if defined(GRAN)
1416       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1417       globalGranStats.tot_heapover++;
1418 # elif defined(PAR)
1419       globalParStats.tot_heapover++;
1420 # endif
1421       break;
1422
1423      case StackOverflow:
1424 # if defined(GRAN)
1425       IF_DEBUG(gran, 
1426                DumpGranEvent(GR_DESCHEDULE, t));
1427       globalGranStats.tot_stackover++;
1428 # elif defined(PAR)
1429       // IF_DEBUG(par, 
1430       // DumpGranEvent(GR_DESCHEDULE, t);
1431       globalParStats.tot_stackover++;
1432 # endif
1433       break;
1434
1435     case ThreadYielding:
1436 # if defined(GRAN)
1437       IF_DEBUG(gran, 
1438                DumpGranEvent(GR_DESCHEDULE, t));
1439       globalGranStats.tot_yields++;
1440 # elif defined(PAR)
1441       // IF_DEBUG(par, 
1442       // DumpGranEvent(GR_DESCHEDULE, t);
1443       globalParStats.tot_yields++;
1444 # endif
1445       break; 
1446
1447     case ThreadBlocked:
1448 # if defined(GRAN)
1449       IF_DEBUG(scheduler,
1450                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1451                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1452                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1453                if (t->block_info.closure!=(StgClosure*)NULL)
1454                  print_bq(t->block_info.closure);
1455                debugBelch("\n"));
1456
1457       // ??? needed; should emit block before
1458       IF_DEBUG(gran, 
1459                DumpGranEvent(GR_DESCHEDULE, t)); 
1460       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1461       /*
1462         ngoq Dogh!
1463       ASSERT(procStatus[CurrentProc]==Busy || 
1464               ((procStatus[CurrentProc]==Fetching) && 
1465               (t->block_info.closure!=(StgClosure*)NULL)));
1466       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1467           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1468             procStatus[CurrentProc]==Fetching)) 
1469         procStatus[CurrentProc] = Idle;
1470       */
1471 # elif defined(PAR)
1472 //++PAR++  blockThread() writes the event (change?)
1473 # endif
1474     break;
1475
1476   case ThreadFinished:
1477     break;
1478
1479   default:
1480     barf("parGlobalStats: unknown return code");
1481     break;
1482     }
1483 #endif
1484 }
1485
1486 /* -----------------------------------------------------------------------------
1487  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1488  * -------------------------------------------------------------------------- */
1489
1490 static rtsBool
1491 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1492 {
1493     // did the task ask for a large block?
1494     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1495         // if so, get one and push it on the front of the nursery.
1496         bdescr *bd;
1497         lnat blocks;
1498         
1499         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1500         
1501         IF_DEBUG(scheduler,
1502                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1503                             (long)t->id, whatNext_strs[t->what_next], blocks));
1504         
1505         // don't do this if the nursery is (nearly) full, we'll GC first.
1506         if (cap->r.rCurrentNursery->link != NULL ||
1507             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1508                                                // if the nursery has only one block.
1509             
1510             ACQUIRE_SM_LOCK
1511             bd = allocGroup( blocks );
1512             RELEASE_SM_LOCK
1513             cap->r.rNursery->n_blocks += blocks;
1514             
1515             // link the new group into the list
1516             bd->link = cap->r.rCurrentNursery;
1517             bd->u.back = cap->r.rCurrentNursery->u.back;
1518             if (cap->r.rCurrentNursery->u.back != NULL) {
1519                 cap->r.rCurrentNursery->u.back->link = bd;
1520             } else {
1521 #if !defined(THREADED_RTS)
1522                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1523                        g0s0 == cap->r.rNursery);
1524 #endif
1525                 cap->r.rNursery->blocks = bd;
1526             }             
1527             cap->r.rCurrentNursery->u.back = bd;
1528             
1529             // initialise it as a nursery block.  We initialise the
1530             // step, gen_no, and flags field of *every* sub-block in
1531             // this large block, because this is easier than making
1532             // sure that we always find the block head of a large
1533             // block whenever we call Bdescr() (eg. evacuate() and
1534             // isAlive() in the GC would both have to do this, at
1535             // least).
1536             { 
1537                 bdescr *x;
1538                 for (x = bd; x < bd + blocks; x++) {
1539                     x->step = cap->r.rNursery;
1540                     x->gen_no = 0;
1541                     x->flags = 0;
1542                 }
1543             }
1544             
1545             // This assert can be a killer if the app is doing lots
1546             // of large block allocations.
1547             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1548             
1549             // now update the nursery to point to the new block
1550             cap->r.rCurrentNursery = bd;
1551             
1552             // we might be unlucky and have another thread get on the
1553             // run queue before us and steal the large block, but in that
1554             // case the thread will just end up requesting another large
1555             // block.
1556             pushOnRunQueue(cap,t);
1557             return rtsFalse;  /* not actually GC'ing */
1558         }
1559     }
1560     
1561     IF_DEBUG(scheduler,
1562              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1563                         (long)t->id, whatNext_strs[t->what_next]));
1564 #if defined(GRAN)
1565     ASSERT(!is_on_queue(t,CurrentProc));
1566 #elif defined(PARALLEL_HASKELL)
1567     /* Currently we emit a DESCHEDULE event before GC in GUM.
1568        ToDo: either add separate event to distinguish SYSTEM time from rest
1569        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1570     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1571         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1572                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1573         emitSchedule = rtsTrue;
1574     }
1575 #endif
1576       
1577     pushOnRunQueue(cap,t);
1578     return rtsTrue;
1579     /* actual GC is done at the end of the while loop in schedule() */
1580 }
1581
1582 /* -----------------------------------------------------------------------------
1583  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1584  * -------------------------------------------------------------------------- */
1585
1586 static void
1587 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1588 {
1589     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1590                                   (long)t->id, whatNext_strs[t->what_next]));
1591     /* just adjust the stack for this thread, then pop it back
1592      * on the run queue.
1593      */
1594     { 
1595         /* enlarge the stack */
1596         StgTSO *new_t = threadStackOverflow(cap, t);
1597         
1598         /* The TSO attached to this Task may have moved, so update the
1599          * pointer to it.
1600          */
1601         if (task->tso == t) {
1602             task->tso = new_t;
1603         }
1604         pushOnRunQueue(cap,new_t);
1605     }
1606 }
1607
1608 /* -----------------------------------------------------------------------------
1609  * Handle a thread that returned to the scheduler with ThreadYielding
1610  * -------------------------------------------------------------------------- */
1611
1612 static rtsBool
1613 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1614 {
1615     // Reset the context switch flag.  We don't do this just before
1616     // running the thread, because that would mean we would lose ticks
1617     // during GC, which can lead to unfair scheduling (a thread hogs
1618     // the CPU because the tick always arrives during GC).  This way
1619     // penalises threads that do a lot of allocation, but that seems
1620     // better than the alternative.
1621     context_switch = 0;
1622     
1623     /* put the thread back on the run queue.  Then, if we're ready to
1624      * GC, check whether this is the last task to stop.  If so, wake
1625      * up the GC thread.  getThread will block during a GC until the
1626      * GC is finished.
1627      */
1628     IF_DEBUG(scheduler,
1629              if (t->what_next != prev_what_next) {
1630                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1631                             (long)t->id, whatNext_strs[t->what_next]);
1632              } else {
1633                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1634                             (long)t->id, whatNext_strs[t->what_next]);
1635              }
1636         );
1637     
1638     IF_DEBUG(sanity,
1639              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1640              checkTSO(t));
1641     ASSERT(t->link == END_TSO_QUEUE);
1642     
1643     // Shortcut if we're just switching evaluators: don't bother
1644     // doing stack squeezing (which can be expensive), just run the
1645     // thread.
1646     if (t->what_next != prev_what_next) {
1647         return rtsTrue;
1648     }
1649     
1650 #if defined(GRAN)
1651     ASSERT(!is_on_queue(t,CurrentProc));
1652       
1653     IF_DEBUG(sanity,
1654              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1655              checkThreadQsSanity(rtsTrue));
1656
1657 #endif
1658
1659     addToRunQueue(cap,t);
1660
1661 #if defined(GRAN)
1662     /* add a ContinueThread event to actually process the thread */
1663     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1664               ContinueThread,
1665               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1666     IF_GRAN_DEBUG(bq, 
1667                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1668                   G_EVENTQ(0);
1669                   G_CURR_THREADQ(0));
1670 #endif
1671     return rtsFalse;
1672 }
1673
1674 /* -----------------------------------------------------------------------------
1675  * Handle a thread that returned to the scheduler with ThreadBlocked
1676  * -------------------------------------------------------------------------- */
1677
1678 static void
1679 scheduleHandleThreadBlocked( StgTSO *t
1680 #if !defined(GRAN) && !defined(DEBUG)
1681     STG_UNUSED
1682 #endif
1683     )
1684 {
1685 #if defined(GRAN)
1686     IF_DEBUG(scheduler,
1687              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1688                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1689              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1690     
1691     // ??? needed; should emit block before
1692     IF_DEBUG(gran, 
1693              DumpGranEvent(GR_DESCHEDULE, t)); 
1694     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1695     /*
1696       ngoq Dogh!
1697       ASSERT(procStatus[CurrentProc]==Busy || 
1698       ((procStatus[CurrentProc]==Fetching) && 
1699       (t->block_info.closure!=(StgClosure*)NULL)));
1700       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1701       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1702       procStatus[CurrentProc]==Fetching)) 
1703       procStatus[CurrentProc] = Idle;
1704     */
1705 #elif defined(PAR)
1706     IF_DEBUG(scheduler,
1707              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1708                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1709     IF_PAR_DEBUG(bq,
1710                  
1711                  if (t->block_info.closure!=(StgClosure*)NULL) 
1712                  print_bq(t->block_info.closure));
1713     
1714     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1715     blockThread(t);
1716     
1717     /* whatever we schedule next, we must log that schedule */
1718     emitSchedule = rtsTrue;
1719     
1720 #else /* !GRAN */
1721
1722       // We don't need to do anything.  The thread is blocked, and it
1723       // has tidied up its stack and placed itself on whatever queue
1724       // it needs to be on.
1725
1726 #if !defined(THREADED_RTS)
1727     ASSERT(t->why_blocked != NotBlocked);
1728              // This might not be true under THREADED_RTS: we don't have
1729              // exclusive access to this TSO, so someone might have
1730              // woken it up by now.  This actually happens: try
1731              // conc023 +RTS -N2.
1732 #endif
1733
1734     IF_DEBUG(scheduler,
1735              debugBelch("--<< thread %d (%s) stopped: ", 
1736                         t->id, whatNext_strs[t->what_next]);
1737              printThreadBlockage(t);
1738              debugBelch("\n"));
1739     
1740     /* Only for dumping event to log file 
1741        ToDo: do I need this in GranSim, too?
1742        blockThread(t);
1743     */
1744 #endif
1745 }
1746
1747 /* -----------------------------------------------------------------------------
1748  * Handle a thread that returned to the scheduler with ThreadFinished
1749  * -------------------------------------------------------------------------- */
1750
1751 static rtsBool
1752 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1753 {
1754     /* Need to check whether this was a main thread, and if so,
1755      * return with the return value.
1756      *
1757      * We also end up here if the thread kills itself with an
1758      * uncaught exception, see Exception.cmm.
1759      */
1760     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1761                                   t->id, whatNext_strs[t->what_next]));
1762
1763 #if defined(GRAN)
1764       endThread(t, CurrentProc); // clean-up the thread
1765 #elif defined(PARALLEL_HASKELL)
1766       /* For now all are advisory -- HWL */
1767       //if(t->priority==AdvisoryPriority) ??
1768       advisory_thread_count--; // JB: Caution with this counter, buggy!
1769       
1770 # if defined(DIST)
1771       if(t->dist.priority==RevalPriority)
1772         FinishReval(t);
1773 # endif
1774     
1775 # if defined(EDENOLD)
1776       // the thread could still have an outport... (BUG)
1777       if (t->eden.outport != -1) {
1778       // delete the outport for the tso which has finished...
1779         IF_PAR_DEBUG(eden_ports,
1780                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1781                               t->eden.outport, t->id));
1782         deleteOPT(t);
1783       }
1784       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1785       if (t->eden.epid != -1) {
1786         IF_PAR_DEBUG(eden_ports,
1787                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1788                            t->id, t->eden.epid));
1789         removeTSOfromProcess(t);
1790       }
1791 # endif 
1792
1793 # if defined(PAR)
1794       if (RtsFlags.ParFlags.ParStats.Full &&
1795           !RtsFlags.ParFlags.ParStats.Suppressed) 
1796         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1797
1798       //  t->par only contains statistics: left out for now...
1799       IF_PAR_DEBUG(fish,
1800                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1801                               t->id,t,t->par.sparkname));
1802 # endif
1803 #endif // PARALLEL_HASKELL
1804
1805       //
1806       // Check whether the thread that just completed was a bound
1807       // thread, and if so return with the result.  
1808       //
1809       // There is an assumption here that all thread completion goes
1810       // through this point; we need to make sure that if a thread
1811       // ends up in the ThreadKilled state, that it stays on the run
1812       // queue so it can be dealt with here.
1813       //
1814
1815       if (t->bound) {
1816
1817           if (t->bound != task) {
1818 #if !defined(THREADED_RTS)
1819               // Must be a bound thread that is not the topmost one.  Leave
1820               // it on the run queue until the stack has unwound to the
1821               // point where we can deal with this.  Leaving it on the run
1822               // queue also ensures that the garbage collector knows about
1823               // this thread and its return value (it gets dropped from the
1824               // all_threads list so there's no other way to find it).
1825               appendToRunQueue(cap,t);
1826               return rtsFalse;
1827 #else
1828               // this cannot happen in the threaded RTS, because a
1829               // bound thread can only be run by the appropriate Task.
1830               barf("finished bound thread that isn't mine");
1831 #endif
1832           }
1833
1834           ASSERT(task->tso == t);
1835
1836           if (t->what_next == ThreadComplete) {
1837               if (task->ret) {
1838                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1839                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1840               }
1841               task->stat = Success;
1842           } else {
1843               if (task->ret) {
1844                   *(task->ret) = NULL;
1845               }
1846               if (interrupted) {
1847                   task->stat = Interrupted;
1848               } else {
1849                   task->stat = Killed;
1850               }
1851           }
1852 #ifdef DEBUG
1853           removeThreadLabel((StgWord)task->tso->id);
1854 #endif
1855           return rtsTrue; // tells schedule() to return
1856       }
1857
1858       return rtsFalse;
1859 }
1860
1861 /* -----------------------------------------------------------------------------
1862  * Perform a heap census, if PROFILING
1863  * -------------------------------------------------------------------------- */
1864
1865 static rtsBool
1866 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1867 {
1868 #if defined(PROFILING)
1869     // When we have +RTS -i0 and we're heap profiling, do a census at
1870     // every GC.  This lets us get repeatable runs for debugging.
1871     if (performHeapProfile ||
1872         (RtsFlags.ProfFlags.profileInterval==0 &&
1873          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1874
1875         // checking black holes is necessary before GC, otherwise
1876         // there may be threads that are unreachable except by the
1877         // blackhole queue, which the GC will consider to be
1878         // deadlocked.
1879         scheduleCheckBlackHoles(&MainCapability);
1880
1881         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1882         GarbageCollect(GetRoots, rtsTrue);
1883
1884         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1885         heapCensus();
1886
1887         performHeapProfile = rtsFalse;
1888         return rtsTrue;  // true <=> we already GC'd
1889     }
1890 #endif
1891     return rtsFalse;
1892 }
1893
1894 /* -----------------------------------------------------------------------------
1895  * Perform a garbage collection if necessary
1896  * -------------------------------------------------------------------------- */
1897
1898 static void
1899 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1900               rtsBool force_major, void (*get_roots)(evac_fn))
1901 {
1902     StgTSO *t;
1903 #ifdef THREADED_RTS
1904     static volatile StgWord waiting_for_gc;
1905     rtsBool was_waiting;
1906     nat i;
1907 #endif
1908
1909 #ifdef THREADED_RTS
1910     // In order to GC, there must be no threads running Haskell code.
1911     // Therefore, the GC thread needs to hold *all* the capabilities,
1912     // and release them after the GC has completed.  
1913     //
1914     // This seems to be the simplest way: previous attempts involved
1915     // making all the threads with capabilities give up their
1916     // capabilities and sleep except for the *last* one, which
1917     // actually did the GC.  But it's quite hard to arrange for all
1918     // the other tasks to sleep and stay asleep.
1919     //
1920         
1921     was_waiting = cas(&waiting_for_gc, 0, 1);
1922     if (was_waiting) {
1923         do {
1924             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1925             if (cap) yieldCapability(&cap,task);
1926         } while (waiting_for_gc);
1927         return;  // NOTE: task->cap might have changed here
1928     }
1929
1930     for (i=0; i < n_capabilities; i++) {
1931         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1932         if (cap != &capabilities[i]) {
1933             Capability *pcap = &capabilities[i];
1934             // we better hope this task doesn't get migrated to
1935             // another Capability while we're waiting for this one.
1936             // It won't, because load balancing happens while we have
1937             // all the Capabilities, but even so it's a slightly
1938             // unsavoury invariant.
1939             task->cap = pcap;
1940             context_switch = 1;
1941             waitForReturnCapability(&pcap, task);
1942             if (pcap != &capabilities[i]) {
1943                 barf("scheduleDoGC: got the wrong capability");
1944             }
1945         }
1946     }
1947
1948     waiting_for_gc = rtsFalse;
1949 #endif
1950
1951     /* Kick any transactions which are invalid back to their
1952      * atomically frames.  When next scheduled they will try to
1953      * commit, this commit will fail and they will retry.
1954      */
1955     { 
1956         StgTSO *next;
1957
1958         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1959             if (t->what_next == ThreadRelocated) {
1960                 next = t->link;
1961             } else {
1962                 next = t->global_link;
1963                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1964                     if (!stmValidateNestOfTransactions (t -> trec)) {
1965                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1966                         
1967                         // strip the stack back to the
1968                         // ATOMICALLY_FRAME, aborting the (nested)
1969                         // transaction, and saving the stack of any
1970                         // partially-evaluated thunks on the heap.
1971                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
1972                         
1973 #ifdef REG_R1
1974                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1975 #endif
1976                     }
1977                 }
1978             }
1979         }
1980     }
1981     
1982     // so this happens periodically:
1983     if (cap) scheduleCheckBlackHoles(cap);
1984     
1985     IF_DEBUG(scheduler, printAllThreads());
1986
1987     /* everybody back, start the GC.
1988      * Could do it in this thread, or signal a condition var
1989      * to do it in another thread.  Either way, we need to
1990      * broadcast on gc_pending_cond afterward.
1991      */
1992 #if defined(THREADED_RTS)
1993     IF_DEBUG(scheduler,sched_belch("doing GC"));
1994 #endif
1995     GarbageCollect(get_roots, force_major);
1996     
1997 #if defined(THREADED_RTS)
1998     // release our stash of capabilities.
1999     for (i = 0; i < n_capabilities; i++) {
2000         if (cap != &capabilities[i]) {
2001             task->cap = &capabilities[i];
2002             releaseCapability(&capabilities[i]);
2003         }
2004     }
2005     if (cap) {
2006         task->cap = cap;
2007     } else {
2008         task->cap = NULL;
2009     }
2010 #endif
2011
2012 #if defined(GRAN)
2013     /* add a ContinueThread event to continue execution of current thread */
2014     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2015               ContinueThread,
2016               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2017     IF_GRAN_DEBUG(bq, 
2018                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2019                   G_EVENTQ(0);
2020                   G_CURR_THREADQ(0));
2021 #endif /* GRAN */
2022 }
2023
2024 /* ---------------------------------------------------------------------------
2025  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2026  * used by Control.Concurrent for error checking.
2027  * ------------------------------------------------------------------------- */
2028  
2029 StgBool
2030 rtsSupportsBoundThreads(void)
2031 {
2032 #if defined(THREADED_RTS)
2033   return rtsTrue;
2034 #else
2035   return rtsFalse;
2036 #endif
2037 }
2038
2039 /* ---------------------------------------------------------------------------
2040  * isThreadBound(tso): check whether tso is bound to an OS thread.
2041  * ------------------------------------------------------------------------- */
2042  
2043 StgBool
2044 isThreadBound(StgTSO* tso USED_IF_THREADS)
2045 {
2046 #if defined(THREADED_RTS)
2047   return (tso->bound != NULL);
2048 #endif
2049   return rtsFalse;
2050 }
2051
2052 /* ---------------------------------------------------------------------------
2053  * Singleton fork(). Do not copy any running threads.
2054  * ------------------------------------------------------------------------- */
2055
2056 #if !defined(mingw32_HOST_OS)
2057 #define FORKPROCESS_PRIMOP_SUPPORTED
2058 #endif
2059
2060 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2061 static void 
2062 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2063 #endif
2064 StgInt
2065 forkProcess(HsStablePtr *entry
2066 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2067             STG_UNUSED
2068 #endif
2069            )
2070 {
2071 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2072     Task *task;
2073     pid_t pid;
2074     StgTSO* t,*next;
2075     Capability *cap;
2076     
2077 #if defined(THREADED_RTS)
2078     if (RtsFlags.ParFlags.nNodes > 1) {
2079         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2080         stg_exit(EXIT_FAILURE);
2081     }
2082 #endif
2083
2084     IF_DEBUG(scheduler,sched_belch("forking!"));
2085     
2086     // ToDo: for SMP, we should probably acquire *all* the capabilities
2087     cap = rts_lock();
2088     
2089     pid = fork();
2090     
2091     if (pid) { // parent
2092         
2093         // just return the pid
2094         rts_unlock(cap);
2095         return pid;
2096         
2097     } else { // child
2098         
2099         // delete all threads
2100         cap->run_queue_hd = END_TSO_QUEUE;
2101         cap->run_queue_tl = END_TSO_QUEUE;
2102         
2103         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2104             next = t->link;
2105             
2106             // don't allow threads to catch the ThreadKilled exception
2107             deleteThreadImmediately(cap,t);
2108         }
2109         
2110         // wipe the task list
2111         ACQUIRE_LOCK(&sched_mutex);
2112         for (task = all_tasks; task != NULL; task=task->all_link) {
2113             if (task != cap->running_task) discardTask(task);
2114         }
2115         RELEASE_LOCK(&sched_mutex);
2116
2117         cap->suspended_ccalling_tasks = NULL;
2118
2119 #if defined(THREADED_RTS)
2120         // wipe our spare workers list.
2121         cap->spare_workers = NULL;
2122         cap->returning_tasks_hd = NULL;
2123         cap->returning_tasks_tl = NULL;
2124 #endif
2125
2126         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2127         rts_checkSchedStatus("forkProcess",cap);
2128         
2129         rts_unlock(cap);
2130         hs_exit();                      // clean up and exit
2131         stg_exit(EXIT_SUCCESS);
2132     }
2133 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2134     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2135     return -1;
2136 #endif
2137 }
2138
2139 /* ---------------------------------------------------------------------------
2140  * Delete the threads on the run queue of the current capability.
2141  * ------------------------------------------------------------------------- */
2142    
2143 static void
2144 deleteRunQueue (Capability *cap)
2145 {
2146     StgTSO *t, *next;
2147     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2148         ASSERT(t->what_next != ThreadRelocated);
2149         next = t->link;
2150         deleteThread(cap, t);
2151     }
2152 }
2153
2154 /* startThread and  insertThread are now in GranSim.c -- HWL */
2155
2156
2157 /* -----------------------------------------------------------------------------
2158    Managing the suspended_ccalling_tasks list.
2159    Locks required: sched_mutex
2160    -------------------------------------------------------------------------- */
2161
2162 STATIC_INLINE void
2163 suspendTask (Capability *cap, Task *task)
2164 {
2165     ASSERT(task->next == NULL && task->prev == NULL);
2166     task->next = cap->suspended_ccalling_tasks;
2167     task->prev = NULL;
2168     if (cap->suspended_ccalling_tasks) {
2169         cap->suspended_ccalling_tasks->prev = task;
2170     }
2171     cap->suspended_ccalling_tasks = task;
2172 }
2173
2174 STATIC_INLINE void
2175 recoverSuspendedTask (Capability *cap, Task *task)
2176 {
2177     if (task->prev) {
2178         task->prev->next = task->next;
2179     } else {
2180         ASSERT(cap->suspended_ccalling_tasks == task);
2181         cap->suspended_ccalling_tasks = task->next;
2182     }
2183     if (task->next) {
2184         task->next->prev = task->prev;
2185     }
2186     task->next = task->prev = NULL;
2187 }
2188
2189 /* ---------------------------------------------------------------------------
2190  * Suspending & resuming Haskell threads.
2191  * 
2192  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2193  * its capability before calling the C function.  This allows another
2194  * task to pick up the capability and carry on running Haskell
2195  * threads.  It also means that if the C call blocks, it won't lock
2196  * the whole system.
2197  *
2198  * The Haskell thread making the C call is put to sleep for the
2199  * duration of the call, on the susepended_ccalling_threads queue.  We
2200  * give out a token to the task, which it can use to resume the thread
2201  * on return from the C function.
2202  * ------------------------------------------------------------------------- */
2203    
2204 void *
2205 suspendThread (StgRegTable *reg)
2206 {
2207   Capability *cap;
2208   int saved_errno = errno;
2209   StgTSO *tso;
2210   Task *task;
2211
2212   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2213    */
2214   cap = regTableToCapability(reg);
2215
2216   task = cap->running_task;
2217   tso = cap->r.rCurrentTSO;
2218
2219   IF_DEBUG(scheduler,
2220            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2221
2222   // XXX this might not be necessary --SDM
2223   tso->what_next = ThreadRunGHC;
2224
2225   threadPaused(cap,tso);
2226
2227   if(tso->blocked_exceptions == NULL)  {
2228       tso->why_blocked = BlockedOnCCall;
2229       tso->blocked_exceptions = END_TSO_QUEUE;
2230   } else {
2231       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2232   }
2233
2234   // Hand back capability
2235   task->suspended_tso = tso;
2236
2237   ACQUIRE_LOCK(&cap->lock);
2238
2239   suspendTask(cap,task);
2240   cap->in_haskell = rtsFalse;
2241   releaseCapability_(cap);
2242   
2243   RELEASE_LOCK(&cap->lock);
2244
2245 #if defined(THREADED_RTS)
2246   /* Preparing to leave the RTS, so ensure there's a native thread/task
2247      waiting to take over.
2248   */
2249   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2250 #endif
2251
2252   errno = saved_errno;
2253   return task;
2254 }
2255
2256 StgRegTable *
2257 resumeThread (void *task_)
2258 {
2259     StgTSO *tso;
2260     Capability *cap;
2261     int saved_errno = errno;
2262     Task *task = task_;
2263
2264     cap = task->cap;
2265     // Wait for permission to re-enter the RTS with the result.
2266     waitForReturnCapability(&cap,task);
2267     // we might be on a different capability now... but if so, our
2268     // entry on the suspended_ccalling_tasks list will also have been
2269     // migrated.
2270
2271     // Remove the thread from the suspended list
2272     recoverSuspendedTask(cap,task);
2273
2274     tso = task->suspended_tso;
2275     task->suspended_tso = NULL;
2276     tso->link = END_TSO_QUEUE;
2277     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2278     
2279     if (tso->why_blocked == BlockedOnCCall) {
2280         awakenBlockedQueue(cap,tso->blocked_exceptions);
2281         tso->blocked_exceptions = NULL;
2282     }
2283     
2284     /* Reset blocking status */
2285     tso->why_blocked  = NotBlocked;
2286     
2287     cap->r.rCurrentTSO = tso;
2288     cap->in_haskell = rtsTrue;
2289     errno = saved_errno;
2290
2291     /* We might have GC'd, mark the TSO dirty again */
2292     dirtyTSO(tso);
2293
2294     IF_DEBUG(sanity, checkTSO(tso));
2295
2296     return &cap->r;
2297 }
2298
2299 /* ---------------------------------------------------------------------------
2300  * Comparing Thread ids.
2301  *
2302  * This is used from STG land in the implementation of the
2303  * instances of Eq/Ord for ThreadIds.
2304  * ------------------------------------------------------------------------ */
2305
2306 int
2307 cmp_thread(StgPtr tso1, StgPtr tso2) 
2308
2309   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2310   StgThreadID id2 = ((StgTSO *)tso2)->id;
2311  
2312   if (id1 < id2) return (-1);
2313   if (id1 > id2) return 1;
2314   return 0;
2315 }
2316
2317 /* ---------------------------------------------------------------------------
2318  * Fetching the ThreadID from an StgTSO.
2319  *
2320  * This is used in the implementation of Show for ThreadIds.
2321  * ------------------------------------------------------------------------ */
2322 int
2323 rts_getThreadId(StgPtr tso) 
2324 {
2325   return ((StgTSO *)tso)->id;
2326 }
2327
2328 #ifdef DEBUG
2329 void
2330 labelThread(StgPtr tso, char *label)
2331 {
2332   int len;
2333   void *buf;
2334
2335   /* Caveat: Once set, you can only set the thread name to "" */
2336   len = strlen(label)+1;
2337   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2338   strncpy(buf,label,len);
2339   /* Update will free the old memory for us */
2340   updateThreadLabel(((StgTSO *)tso)->id,buf);
2341 }
2342 #endif /* DEBUG */
2343
2344 /* ---------------------------------------------------------------------------
2345    Create a new thread.
2346
2347    The new thread starts with the given stack size.  Before the
2348    scheduler can run, however, this thread needs to have a closure
2349    (and possibly some arguments) pushed on its stack.  See
2350    pushClosure() in Schedule.h.
2351
2352    createGenThread() and createIOThread() (in SchedAPI.h) are
2353    convenient packaged versions of this function.
2354
2355    currently pri (priority) is only used in a GRAN setup -- HWL
2356    ------------------------------------------------------------------------ */
2357 #if defined(GRAN)
2358 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2359 StgTSO *
2360 createThread(nat size, StgInt pri)
2361 #else
2362 StgTSO *
2363 createThread(Capability *cap, nat size)
2364 #endif
2365 {
2366     StgTSO *tso;
2367     nat stack_size;
2368
2369     /* sched_mutex is *not* required */
2370
2371     /* First check whether we should create a thread at all */
2372 #if defined(PARALLEL_HASKELL)
2373     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2374     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2375         threadsIgnored++;
2376         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2377                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2378         return END_TSO_QUEUE;
2379     }
2380     threadsCreated++;
2381 #endif
2382
2383 #if defined(GRAN)
2384     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2385 #endif
2386
2387     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2388
2389     /* catch ridiculously small stack sizes */
2390     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2391         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2392     }
2393
2394     stack_size = size - TSO_STRUCT_SIZEW;
2395     
2396     tso = (StgTSO *)allocateLocal(cap, size);
2397     TICK_ALLOC_TSO(stack_size, 0);
2398
2399     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2400 #if defined(GRAN)
2401     SET_GRAN_HDR(tso, ThisPE);
2402 #endif
2403
2404     // Always start with the compiled code evaluator
2405     tso->what_next = ThreadRunGHC;
2406
2407     tso->why_blocked  = NotBlocked;
2408     tso->blocked_exceptions = NULL;
2409     tso->flags = TSO_DIRTY;
2410     
2411     tso->saved_errno = 0;
2412     tso->bound = NULL;
2413     
2414     tso->stack_size     = stack_size;
2415     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2416                           - TSO_STRUCT_SIZEW;
2417     tso->sp             = (P_)&(tso->stack) + stack_size;
2418
2419     tso->trec = NO_TREC;
2420     
2421 #ifdef PROFILING
2422     tso->prof.CCCS = CCS_MAIN;
2423 #endif
2424     
2425   /* put a stop frame on the stack */
2426     tso->sp -= sizeofW(StgStopFrame);
2427     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2428     tso->link = END_TSO_QUEUE;
2429     
2430   // ToDo: check this
2431 #if defined(GRAN)
2432     /* uses more flexible routine in GranSim */
2433     insertThread(tso, CurrentProc);
2434 #else
2435     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2436      * from its creation
2437      */
2438 #endif
2439     
2440 #if defined(GRAN) 
2441     if (RtsFlags.GranFlags.GranSimStats.Full) 
2442         DumpGranEvent(GR_START,tso);
2443 #elif defined(PARALLEL_HASKELL)
2444     if (RtsFlags.ParFlags.ParStats.Full) 
2445         DumpGranEvent(GR_STARTQ,tso);
2446     /* HACk to avoid SCHEDULE 
2447        LastTSO = tso; */
2448 #endif
2449     
2450     /* Link the new thread on the global thread list.
2451      */
2452     ACQUIRE_LOCK(&sched_mutex);
2453     tso->id = next_thread_id++;  // while we have the mutex
2454     tso->global_link = all_threads;
2455     all_threads = tso;
2456     RELEASE_LOCK(&sched_mutex);
2457     
2458 #if defined(DIST)
2459     tso->dist.priority = MandatoryPriority; //by default that is...
2460 #endif
2461     
2462 #if defined(GRAN)
2463     tso->gran.pri = pri;
2464 # if defined(DEBUG)
2465     tso->gran.magic = TSO_MAGIC; // debugging only
2466 # endif
2467     tso->gran.sparkname   = 0;
2468     tso->gran.startedat   = CURRENT_TIME; 
2469     tso->gran.exported    = 0;
2470     tso->gran.basicblocks = 0;
2471     tso->gran.allocs      = 0;
2472     tso->gran.exectime    = 0;
2473     tso->gran.fetchtime   = 0;
2474     tso->gran.fetchcount  = 0;
2475     tso->gran.blocktime   = 0;
2476     tso->gran.blockcount  = 0;
2477     tso->gran.blockedat   = 0;
2478     tso->gran.globalsparks = 0;
2479     tso->gran.localsparks  = 0;
2480     if (RtsFlags.GranFlags.Light)
2481         tso->gran.clock  = Now; /* local clock */
2482     else
2483         tso->gran.clock  = 0;
2484     
2485     IF_DEBUG(gran,printTSO(tso));
2486 #elif defined(PARALLEL_HASKELL)
2487 # if defined(DEBUG)
2488     tso->par.magic = TSO_MAGIC; // debugging only
2489 # endif
2490     tso->par.sparkname   = 0;
2491     tso->par.startedat   = CURRENT_TIME; 
2492     tso->par.exported    = 0;
2493     tso->par.basicblocks = 0;
2494     tso->par.allocs      = 0;
2495     tso->par.exectime    = 0;
2496     tso->par.fetchtime   = 0;
2497     tso->par.fetchcount  = 0;
2498     tso->par.blocktime   = 0;
2499     tso->par.blockcount  = 0;
2500     tso->par.blockedat   = 0;
2501     tso->par.globalsparks = 0;
2502     tso->par.localsparks  = 0;
2503 #endif
2504     
2505 #if defined(GRAN)
2506     globalGranStats.tot_threads_created++;
2507     globalGranStats.threads_created_on_PE[CurrentProc]++;
2508     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2509     globalGranStats.tot_sq_probes++;
2510 #elif defined(PARALLEL_HASKELL)
2511     // collect parallel global statistics (currently done together with GC stats)
2512     if (RtsFlags.ParFlags.ParStats.Global &&
2513         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2514         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2515         globalParStats.tot_threads_created++;
2516     }
2517 #endif 
2518     
2519 #if defined(GRAN)
2520     IF_GRAN_DEBUG(pri,
2521                   sched_belch("==__ schedule: Created TSO %d (%p);",
2522                               CurrentProc, tso, tso->id));
2523 #elif defined(PARALLEL_HASKELL)
2524     IF_PAR_DEBUG(verbose,
2525                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2526                              (long)tso->id, tso, advisory_thread_count));
2527 #else
2528     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2529                                    (long)tso->id, (long)tso->stack_size));
2530 #endif    
2531     return tso;
2532 }
2533
2534 #if defined(PAR)
2535 /* RFP:
2536    all parallel thread creation calls should fall through the following routine.
2537 */
2538 StgTSO *
2539 createThreadFromSpark(rtsSpark spark) 
2540 { StgTSO *tso;
2541   ASSERT(spark != (rtsSpark)NULL);
2542 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2543   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2544   { threadsIgnored++;
2545     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2546           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2547     return END_TSO_QUEUE;
2548   }
2549   else
2550   { threadsCreated++;
2551     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2552     if (tso==END_TSO_QUEUE)     
2553       barf("createSparkThread: Cannot create TSO");
2554 #if defined(DIST)
2555     tso->priority = AdvisoryPriority;
2556 #endif
2557     pushClosure(tso,spark);
2558     addToRunQueue(tso);
2559     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2560   }
2561   return tso;
2562 }
2563 #endif
2564
2565 /*
2566   Turn a spark into a thread.
2567   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2568 */
2569 #if 0
2570 StgTSO *
2571 activateSpark (rtsSpark spark) 
2572 {
2573   StgTSO *tso;
2574
2575   tso = createSparkThread(spark);
2576   if (RtsFlags.ParFlags.ParStats.Full) {   
2577     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2578       IF_PAR_DEBUG(verbose,
2579                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2580                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2581   }
2582   // ToDo: fwd info on local/global spark to thread -- HWL
2583   // tso->gran.exported =  spark->exported;
2584   // tso->gran.locked =   !spark->global;
2585   // tso->gran.sparkname = spark->name;
2586
2587   return tso;
2588 }
2589 #endif
2590
2591 /* ---------------------------------------------------------------------------
2592  * scheduleThread()
2593  *
2594  * scheduleThread puts a thread on the end  of the runnable queue.
2595  * This will usually be done immediately after a thread is created.
2596  * The caller of scheduleThread must create the thread using e.g.
2597  * createThread and push an appropriate closure
2598  * on this thread's stack before the scheduler is invoked.
2599  * ------------------------------------------------------------------------ */
2600
2601 void
2602 scheduleThread(Capability *cap, StgTSO *tso)
2603 {
2604     // The thread goes at the *end* of the run-queue, to avoid possible
2605     // starvation of any threads already on the queue.
2606     appendToRunQueue(cap,tso);
2607 }
2608
2609 Capability *
2610 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2611 {
2612     Task *task;
2613
2614     // We already created/initialised the Task
2615     task = cap->running_task;
2616
2617     // This TSO is now a bound thread; make the Task and TSO
2618     // point to each other.
2619     tso->bound = task;
2620
2621     task->tso = tso;
2622     task->ret = ret;
2623     task->stat = NoStatus;
2624
2625     appendToRunQueue(cap,tso);
2626
2627     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2628
2629 #if defined(GRAN)
2630     /* GranSim specific init */
2631     CurrentTSO = m->tso;                // the TSO to run
2632     procStatus[MainProc] = Busy;        // status of main PE
2633     CurrentProc = MainProc;             // PE to run it on
2634 #endif
2635
2636     cap = schedule(cap,task);
2637
2638     ASSERT(task->stat != NoStatus);
2639     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2640
2641     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2642     return cap;
2643 }
2644
2645 /* ----------------------------------------------------------------------------
2646  * Starting Tasks
2647  * ------------------------------------------------------------------------- */
2648
2649 #if defined(THREADED_RTS)
2650 void
2651 workerStart(Task *task)
2652 {
2653     Capability *cap;
2654
2655     // See startWorkerTask().
2656     ACQUIRE_LOCK(&task->lock);
2657     cap = task->cap;
2658     RELEASE_LOCK(&task->lock);
2659
2660     // set the thread-local pointer to the Task:
2661     taskEnter(task);
2662
2663     // schedule() runs without a lock.
2664     cap = schedule(cap,task);
2665
2666     // On exit from schedule(), we have a Capability.
2667     releaseCapability(cap);
2668     taskStop(task);
2669 }
2670 #endif
2671
2672 /* ---------------------------------------------------------------------------
2673  * initScheduler()
2674  *
2675  * Initialise the scheduler.  This resets all the queues - if the
2676  * queues contained any threads, they'll be garbage collected at the
2677  * next pass.
2678  *
2679  * ------------------------------------------------------------------------ */
2680
2681 void 
2682 initScheduler(void)
2683 {
2684 #if defined(GRAN)
2685   nat i;
2686   for (i=0; i<=MAX_PROC; i++) {
2687     run_queue_hds[i]      = END_TSO_QUEUE;
2688     run_queue_tls[i]      = END_TSO_QUEUE;
2689     blocked_queue_hds[i]  = END_TSO_QUEUE;
2690     blocked_queue_tls[i]  = END_TSO_QUEUE;
2691     ccalling_threadss[i]  = END_TSO_QUEUE;
2692     blackhole_queue[i]    = END_TSO_QUEUE;
2693     sleeping_queue        = END_TSO_QUEUE;
2694   }
2695 #elif !defined(THREADED_RTS)
2696   blocked_queue_hd  = END_TSO_QUEUE;
2697   blocked_queue_tl  = END_TSO_QUEUE;
2698   sleeping_queue    = END_TSO_QUEUE;
2699 #endif
2700
2701   blackhole_queue   = END_TSO_QUEUE;
2702   all_threads       = END_TSO_QUEUE;
2703
2704   context_switch = 0;
2705   interrupted    = 0;
2706
2707   RtsFlags.ConcFlags.ctxtSwitchTicks =
2708       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2709       
2710 #if defined(THREADED_RTS)
2711   /* Initialise the mutex and condition variables used by
2712    * the scheduler. */
2713   initMutex(&sched_mutex);
2714 #endif
2715   
2716   ACQUIRE_LOCK(&sched_mutex);
2717
2718   /* A capability holds the state a native thread needs in
2719    * order to execute STG code. At least one capability is
2720    * floating around (only THREADED_RTS builds have more than one).
2721    */
2722   initCapabilities();
2723
2724   initTaskManager();
2725
2726 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2727   initSparkPools();
2728 #endif
2729
2730 #if defined(THREADED_RTS)
2731   /*
2732    * Eagerly start one worker to run each Capability, except for
2733    * Capability 0.  The idea is that we're probably going to start a
2734    * bound thread on Capability 0 pretty soon, so we don't want a
2735    * worker task hogging it.
2736    */
2737   { 
2738       nat i;
2739       Capability *cap;
2740       for (i = 1; i < n_capabilities; i++) {
2741           cap = &capabilities[i];
2742           ACQUIRE_LOCK(&cap->lock);
2743           startWorkerTask(cap, workerStart);
2744           RELEASE_LOCK(&cap->lock);
2745       }
2746   }
2747 #endif
2748
2749   RELEASE_LOCK(&sched_mutex);
2750 }
2751
2752 void
2753 exitScheduler( void )
2754 {
2755     interrupted = rtsTrue;
2756     shutting_down_scheduler = rtsTrue;
2757
2758 #if defined(THREADED_RTS)
2759     { 
2760         Task *task;
2761         nat i;
2762         
2763         ACQUIRE_LOCK(&sched_mutex);
2764         task = newBoundTask();
2765         RELEASE_LOCK(&sched_mutex);
2766
2767         for (i = 0; i < n_capabilities; i++) {
2768             shutdownCapability(&capabilities[i], task);
2769         }
2770         boundTaskExiting(task);
2771         stopTaskManager();
2772     }
2773 #endif
2774 }
2775
2776 /* ---------------------------------------------------------------------------
2777    Where are the roots that we know about?
2778
2779         - all the threads on the runnable queue
2780         - all the threads on the blocked queue
2781         - all the threads on the sleeping queue
2782         - all the thread currently executing a _ccall_GC
2783         - all the "main threads"
2784      
2785    ------------------------------------------------------------------------ */
2786
2787 /* This has to be protected either by the scheduler monitor, or by the
2788         garbage collection monitor (probably the latter).
2789         KH @ 25/10/99
2790 */
2791
2792 void
2793 GetRoots( evac_fn evac )
2794 {
2795     nat i;
2796     Capability *cap;
2797     Task *task;
2798
2799 #if defined(GRAN)
2800     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2801         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2802             evac((StgClosure **)&run_queue_hds[i]);
2803         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2804             evac((StgClosure **)&run_queue_tls[i]);
2805         
2806         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2807             evac((StgClosure **)&blocked_queue_hds[i]);
2808         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2809             evac((StgClosure **)&blocked_queue_tls[i]);
2810         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2811             evac((StgClosure **)&ccalling_threads[i]);
2812     }
2813
2814     markEventQueue();
2815
2816 #else /* !GRAN */
2817
2818     for (i = 0; i < n_capabilities; i++) {
2819         cap = &capabilities[i];
2820         evac((StgClosure **)&cap->run_queue_hd);
2821         evac((StgClosure **)&cap->run_queue_tl);
2822         
2823         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2824              task=task->next) {
2825             evac((StgClosure **)&task->suspended_tso);
2826         }
2827     }
2828     
2829 #if !defined(THREADED_RTS)
2830     evac((StgClosure **)(void *)&blocked_queue_hd);
2831     evac((StgClosure **)(void *)&blocked_queue_tl);
2832     evac((StgClosure **)(void *)&sleeping_queue);
2833 #endif 
2834 #endif
2835
2836     // evac((StgClosure **)&blackhole_queue);
2837
2838 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2839     markSparkQueue(evac);
2840 #endif
2841     
2842 #if defined(RTS_USER_SIGNALS)
2843     // mark the signal handlers (signals should be already blocked)
2844     markSignalHandlers(evac);
2845 #endif
2846 }
2847
2848 /* -----------------------------------------------------------------------------
2849    performGC
2850
2851    This is the interface to the garbage collector from Haskell land.
2852    We provide this so that external C code can allocate and garbage
2853    collect when called from Haskell via _ccall_GC.
2854
2855    It might be useful to provide an interface whereby the programmer
2856    can specify more roots (ToDo).
2857    
2858    This needs to be protected by the GC condition variable above.  KH.
2859    -------------------------------------------------------------------------- */
2860
2861 static void (*extra_roots)(evac_fn);
2862
2863 static void
2864 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2865 {
2866     Task *task = myTask();
2867
2868     if (task == NULL) {
2869         ACQUIRE_LOCK(&sched_mutex);
2870         task = newBoundTask();
2871         RELEASE_LOCK(&sched_mutex);
2872         scheduleDoGC(NULL,task,force_major, get_roots);
2873         boundTaskExiting(task);
2874     } else {
2875         scheduleDoGC(NULL,task,force_major, get_roots);
2876     }
2877 }
2878
2879 void
2880 performGC(void)
2881 {
2882     performGC_(rtsFalse, GetRoots);
2883 }
2884
2885 void
2886 performMajorGC(void)
2887 {
2888     performGC_(rtsTrue, GetRoots);
2889 }
2890
2891 static void
2892 AllRoots(evac_fn evac)
2893 {
2894     GetRoots(evac);             // the scheduler's roots
2895     extra_roots(evac);          // the user's roots
2896 }
2897
2898 void
2899 performGCWithRoots(void (*get_roots)(evac_fn))
2900 {
2901     extra_roots = get_roots;
2902     performGC_(rtsFalse, AllRoots);
2903 }
2904
2905 /* -----------------------------------------------------------------------------
2906    Stack overflow
2907
2908    If the thread has reached its maximum stack size, then raise the
2909    StackOverflow exception in the offending thread.  Otherwise
2910    relocate the TSO into a larger chunk of memory and adjust its stack
2911    size appropriately.
2912    -------------------------------------------------------------------------- */
2913
2914 static StgTSO *
2915 threadStackOverflow(Capability *cap, StgTSO *tso)
2916 {
2917   nat new_stack_size, stack_words;
2918   lnat new_tso_size;
2919   StgPtr new_sp;
2920   StgTSO *dest;
2921
2922   IF_DEBUG(sanity,checkTSO(tso));
2923   if (tso->stack_size >= tso->max_stack_size) {
2924
2925     IF_DEBUG(gc,
2926              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2927                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2928              /* If we're debugging, just print out the top of the stack */
2929              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2930                                               tso->sp+64)));
2931
2932     /* Send this thread the StackOverflow exception */
2933     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2934     return tso;
2935   }
2936
2937   /* Try to double the current stack size.  If that takes us over the
2938    * maximum stack size for this thread, then use the maximum instead.
2939    * Finally round up so the TSO ends up as a whole number of blocks.
2940    */
2941   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2942   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2943                                        TSO_STRUCT_SIZE)/sizeof(W_);
2944   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2945   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2946
2947   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2948
2949   dest = (StgTSO *)allocate(new_tso_size);
2950   TICK_ALLOC_TSO(new_stack_size,0);
2951
2952   /* copy the TSO block and the old stack into the new area */
2953   memcpy(dest,tso,TSO_STRUCT_SIZE);
2954   stack_words = tso->stack + tso->stack_size - tso->sp;
2955   new_sp = (P_)dest + new_tso_size - stack_words;
2956   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2957
2958   /* relocate the stack pointers... */
2959   dest->sp         = new_sp;
2960   dest->stack_size = new_stack_size;
2961         
2962   /* Mark the old TSO as relocated.  We have to check for relocated
2963    * TSOs in the garbage collector and any primops that deal with TSOs.
2964    *
2965    * It's important to set the sp value to just beyond the end
2966    * of the stack, so we don't attempt to scavenge any part of the
2967    * dead TSO's stack.
2968    */
2969   tso->what_next = ThreadRelocated;
2970   tso->link = dest;
2971   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2972   tso->why_blocked = NotBlocked;
2973
2974   IF_PAR_DEBUG(verbose,
2975                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2976                      tso->id, tso, tso->stack_size);
2977                /* If we're debugging, just print out the top of the stack */
2978                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2979                                                 tso->sp+64)));
2980   
2981   IF_DEBUG(sanity,checkTSO(tso));
2982 #if 0
2983   IF_DEBUG(scheduler,printTSO(dest));
2984 #endif
2985
2986   return dest;
2987 }
2988
2989 /* ---------------------------------------------------------------------------
2990    Wake up a queue that was blocked on some resource.
2991    ------------------------------------------------------------------------ */
2992
2993 #if defined(GRAN)
2994 STATIC_INLINE void
2995 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2996 {
2997 }
2998 #elif defined(PARALLEL_HASKELL)
2999 STATIC_INLINE void
3000 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3001 {
3002   /* write RESUME events to log file and
3003      update blocked and fetch time (depending on type of the orig closure) */
3004   if (RtsFlags.ParFlags.ParStats.Full) {
3005     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3006                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3007                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3008     if (emptyRunQueue())
3009       emitSchedule = rtsTrue;
3010
3011     switch (get_itbl(node)->type) {
3012         case FETCH_ME_BQ:
3013           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3014           break;
3015         case RBH:
3016         case FETCH_ME:
3017         case BLACKHOLE_BQ:
3018           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3019           break;
3020 #ifdef DIST
3021         case MVAR:
3022           break;
3023 #endif    
3024         default:
3025           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3026         }
3027       }
3028 }
3029 #endif
3030
3031 #if defined(GRAN)
3032 StgBlockingQueueElement *
3033 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3034 {
3035     StgTSO *tso;
3036     PEs node_loc, tso_loc;
3037
3038     node_loc = where_is(node); // should be lifted out of loop
3039     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3040     tso_loc = where_is((StgClosure *)tso);
3041     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3042       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3043       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3044       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3045       // insertThread(tso, node_loc);
3046       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3047                 ResumeThread,
3048                 tso, node, (rtsSpark*)NULL);
3049       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3050       // len_local++;
3051       // len++;
3052     } else { // TSO is remote (actually should be FMBQ)
3053       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3054                                   RtsFlags.GranFlags.Costs.gunblocktime +
3055                                   RtsFlags.GranFlags.Costs.latency;
3056       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3057                 UnblockThread,
3058                 tso, node, (rtsSpark*)NULL);
3059       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3060       // len++;
3061     }
3062     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3063     IF_GRAN_DEBUG(bq,
3064                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3065                           (node_loc==tso_loc ? "Local" : "Global"), 
3066                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3067     tso->block_info.closure = NULL;
3068     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3069                              tso->id, tso));
3070 }
3071 #elif defined(PARALLEL_HASKELL)
3072 StgBlockingQueueElement *
3073 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3074 {
3075     StgBlockingQueueElement *next;
3076
3077     switch (get_itbl(bqe)->type) {
3078     case TSO:
3079       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3080       /* if it's a TSO just push it onto the run_queue */
3081       next = bqe->link;
3082       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3083       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3084       threadRunnable();
3085       unblockCount(bqe, node);
3086       /* reset blocking status after dumping event */
3087       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3088       break;
3089
3090     case BLOCKED_FETCH:
3091       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3092       next = bqe->link;
3093       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3094       PendingFetches = (StgBlockedFetch *)bqe;
3095       break;
3096
3097 # if defined(DEBUG)
3098       /* can ignore this case in a non-debugging setup; 
3099          see comments on RBHSave closures above */
3100     case CONSTR:
3101       /* check that the closure is an RBHSave closure */
3102       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3103              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3104              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3105       break;
3106
3107     default:
3108       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3109            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3110            (StgClosure *)bqe);
3111 # endif
3112     }
3113   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3114   return next;
3115 }
3116 #endif
3117
3118 StgTSO *
3119 unblockOne(Capability *cap, StgTSO *tso)
3120 {
3121   StgTSO *next;
3122
3123   ASSERT(get_itbl(tso)->type == TSO);
3124   ASSERT(tso->why_blocked != NotBlocked);
3125   tso->why_blocked = NotBlocked;
3126   next = tso->link;
3127   tso->link = END_TSO_QUEUE;
3128
3129   // We might have just migrated this TSO to our Capability:
3130   if (tso->bound) {
3131       tso->bound->cap = cap;
3132   }
3133
3134   appendToRunQueue(cap,tso);
3135
3136   // we're holding a newly woken thread, make sure we context switch
3137   // quickly so we can migrate it if necessary.
3138   context_switch = 1;
3139   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3140   return next;
3141 }
3142
3143
3144 #if defined(GRAN)
3145 void 
3146 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3147 {
3148   StgBlockingQueueElement *bqe;
3149   PEs node_loc;
3150   nat len = 0; 
3151
3152   IF_GRAN_DEBUG(bq, 
3153                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3154                       node, CurrentProc, CurrentTime[CurrentProc], 
3155                       CurrentTSO->id, CurrentTSO));
3156
3157   node_loc = where_is(node);
3158
3159   ASSERT(q == END_BQ_QUEUE ||
3160          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3161          get_itbl(q)->type == CONSTR); // closure (type constructor)
3162   ASSERT(is_unique(node));
3163
3164   /* FAKE FETCH: magically copy the node to the tso's proc;
3165      no Fetch necessary because in reality the node should not have been 
3166      moved to the other PE in the first place
3167   */
3168   if (CurrentProc!=node_loc) {
3169     IF_GRAN_DEBUG(bq, 
3170                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3171                         node, node_loc, CurrentProc, CurrentTSO->id, 
3172                         // CurrentTSO, where_is(CurrentTSO),
3173                         node->header.gran.procs));
3174     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3175     IF_GRAN_DEBUG(bq, 
3176                   debugBelch("## new bitmask of node %p is %#x\n",
3177                         node, node->header.gran.procs));
3178     if (RtsFlags.GranFlags.GranSimStats.Global) {
3179       globalGranStats.tot_fake_fetches++;
3180     }
3181   }
3182
3183   bqe = q;
3184   // ToDo: check: ASSERT(CurrentProc==node_loc);
3185   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3186     //next = bqe->link;
3187     /* 
3188        bqe points to the current element in the queue
3189        next points to the next element in the queue
3190     */
3191     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3192     //tso_loc = where_is(tso);
3193     len++;
3194     bqe = unblockOne(bqe, node);
3195   }
3196
3197   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3198      the closure to make room for the anchor of the BQ */
3199   if (bqe!=END_BQ_QUEUE) {
3200     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3201     /*
3202     ASSERT((info_ptr==&RBH_Save_0_info) ||
3203            (info_ptr==&RBH_Save_1_info) ||
3204            (info_ptr==&RBH_Save_2_info));
3205     */
3206     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3207     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3208     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3209
3210     IF_GRAN_DEBUG(bq,
3211                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3212                         node, info_type(node)));
3213   }
3214
3215   /* statistics gathering */
3216   if (RtsFlags.GranFlags.GranSimStats.Global) {
3217     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3218     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3219     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3220     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3221   }
3222   IF_GRAN_DEBUG(bq,
3223                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3224                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3225 }
3226 #elif defined(PARALLEL_HASKELL)
3227 void 
3228 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3229 {
3230   StgBlockingQueueElement *bqe;
3231
3232   IF_PAR_DEBUG(verbose, 
3233                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3234                      node, mytid));
3235 #ifdef DIST  
3236   //RFP
3237   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3238     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3239     return;
3240   }
3241 #endif
3242   
3243   ASSERT(q == END_BQ_QUEUE ||
3244          get_itbl(q)->type == TSO ||           
3245          get_itbl(q)->type == BLOCKED_FETCH || 
3246          get_itbl(q)->type == CONSTR); 
3247
3248   bqe = q;
3249   while (get_itbl(bqe)->type==TSO || 
3250          get_itbl(bqe)->type==BLOCKED_FETCH) {
3251     bqe = unblockOne(bqe, node);
3252   }
3253 }
3254
3255 #else   /* !GRAN && !PARALLEL_HASKELL */
3256
3257 void
3258 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3259 {
3260     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3261                              // Exception.cmm
3262     while (tso != END_TSO_QUEUE) {
3263         tso = unblockOne(cap,tso);
3264     }
3265 }
3266 #endif
3267
3268 /* ---------------------------------------------------------------------------
3269    Interrupt execution
3270    - usually called inside a signal handler so it mustn't do anything fancy.   
3271    ------------------------------------------------------------------------ */
3272
3273 void
3274 interruptStgRts(void)
3275 {
3276     interrupted    = 1;
3277     context_switch = 1;
3278 #if defined(THREADED_RTS)
3279     prodAllCapabilities();
3280 #endif
3281 }
3282
3283 /* -----------------------------------------------------------------------------
3284    Unblock a thread
3285
3286    This is for use when we raise an exception in another thread, which
3287    may be blocked.
3288    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3289    -------------------------------------------------------------------------- */
3290
3291 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3292 /*
3293   NB: only the type of the blocking queue is different in GranSim and GUM
3294       the operations on the queue-elements are the same
3295       long live polymorphism!
3296
3297   Locks: sched_mutex is held upon entry and exit.
3298
3299 */
3300 static void
3301 unblockThread(Capability *cap, StgTSO *tso)
3302 {
3303   StgBlockingQueueElement *t, **last;
3304
3305   switch (tso->why_blocked) {
3306
3307   case NotBlocked:
3308     return;  /* not blocked */
3309
3310   case BlockedOnSTM:
3311     // Be careful: nothing to do here!  We tell the scheduler that the thread
3312     // is runnable and we leave it to the stack-walking code to abort the 
3313     // transaction while unwinding the stack.  We should perhaps have a debugging
3314     // test to make sure that this really happens and that the 'zombie' transaction
3315     // does not get committed.
3316     goto done;
3317
3318   case BlockedOnMVar:
3319     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3320     {
3321       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3322       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3323
3324       last = (StgBlockingQueueElement **)&mvar->head;
3325       for (t = (StgBlockingQueueElement *)mvar->head; 
3326            t != END_BQ_QUEUE; 
3327            last = &t->link, last_tso = t, t = t->link) {
3328         if (t == (StgBlockingQueueElement *)tso) {
3329           *last = (StgBlockingQueueElement *)tso->link;
3330           if (mvar->tail == tso) {
3331             mvar->tail = (StgTSO *)last_tso;
3332           }
3333           goto done;
3334         }
3335       }
3336       barf("unblockThread (MVAR): TSO not found");
3337     }
3338
3339   case BlockedOnBlackHole:
3340     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3341     {
3342       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3343
3344       last = &bq->blocking_queue;
3345       for (t = bq->blocking_queue; 
3346            t != END_BQ_QUEUE; 
3347            last = &t->link, t = t->link) {
3348         if (t == (StgBlockingQueueElement *)tso) {
3349           *last = (StgBlockingQueueElement *)tso->link;
3350           goto done;
3351         }
3352       }
3353       barf("unblockThread (BLACKHOLE): TSO not found");
3354     }
3355
3356   case BlockedOnException:
3357     {
3358       StgTSO *target  = tso->block_info.tso;
3359
3360       ASSERT(get_itbl(target)->type == TSO);
3361
3362       if (target->what_next == ThreadRelocated) {
3363           target = target->link;
3364           ASSERT(get_itbl(target)->type == TSO);
3365       }
3366
3367       ASSERT(target->blocked_exceptions != NULL);
3368
3369       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3370       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3371            t != END_BQ_QUEUE; 
3372            last = &t->link, t = t->link) {
3373         ASSERT(get_itbl(t)->type == TSO);
3374         if (t == (StgBlockingQueueElement *)tso) {
3375           *last = (StgBlockingQueueElement *)tso->link;
3376           goto done;
3377         }
3378       }
3379       barf("unblockThread (Exception): TSO not found");
3380     }
3381
3382   case BlockedOnRead:
3383   case BlockedOnWrite:
3384 #if defined(mingw32_HOST_OS)
3385   case BlockedOnDoProc:
3386 #endif
3387     {
3388       /* take TSO off blocked_queue */
3389       StgBlockingQueueElement *prev = NULL;
3390       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3391            prev = t, t = t->link) {
3392         if (t == (StgBlockingQueueElement *)tso) {
3393           if (prev == NULL) {
3394             blocked_queue_hd = (StgTSO *)t->link;
3395             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3396               blocked_queue_tl = END_TSO_QUEUE;
3397             }
3398           } else {
3399             prev->link = t->link;
3400             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3401               blocked_queue_tl = (StgTSO *)prev;
3402             }
3403           }
3404 #if defined(mingw32_HOST_OS)
3405           /* (Cooperatively) signal that the worker thread should abort
3406            * the request.
3407            */
3408           abandonWorkRequest(tso->block_info.async_result->reqID);
3409 #endif
3410           goto done;
3411         }
3412       }
3413       barf("unblockThread (I/O): TSO not found");
3414     }
3415
3416   case BlockedOnDelay:
3417     {
3418       /* take TSO off sleeping_queue */
3419       StgBlockingQueueElement *prev = NULL;
3420       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3421            prev = t, t = t->link) {
3422         if (t == (StgBlockingQueueElement *)tso) {
3423           if (prev == NULL) {
3424             sleeping_queue = (StgTSO *)t->link;
3425           } else {
3426             prev->link = t->link;
3427           }
3428           goto done;
3429         }
3430       }
3431       barf("unblockThread (delay): TSO not found");
3432     }
3433
3434   default:
3435     barf("unblockThread");
3436   }
3437
3438  done:
3439   tso->link = END_TSO_QUEUE;
3440   tso->why_blocked = NotBlocked;
3441   tso->block_info.closure = NULL;
3442   pushOnRunQueue(cap,tso);
3443 }
3444 #else
3445 static void
3446 unblockThread(Capability *cap, StgTSO *tso)
3447 {
3448   StgTSO *t, **last;
3449   
3450   /* To avoid locking unnecessarily. */
3451   if (tso->why_blocked == NotBlocked) {
3452     return;
3453   }
3454
3455   switch (tso->why_blocked) {
3456
3457   case BlockedOnSTM:
3458     // Be careful: nothing to do here!  We tell the scheduler that the thread
3459     // is runnable and we leave it to the stack-walking code to abort the 
3460     // transaction while unwinding the stack.  We should perhaps have a debugging
3461     // test to make sure that this really happens and that the 'zombie' transaction
3462     // does not get committed.
3463     goto done;
3464
3465   case BlockedOnMVar:
3466     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3467     {
3468       StgTSO *last_tso = END_TSO_QUEUE;
3469       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3470
3471       last = &mvar->head;
3472       for (t = mvar->head; t != END_TSO_QUEUE; 
3473            last = &t->link, last_tso = t, t = t->link) {
3474         if (t == tso) {
3475           *last = tso->link;
3476           if (mvar->tail == tso) {
3477             mvar->tail = last_tso;
3478           }
3479           goto done;
3480         }
3481       }
3482       barf("unblockThread (MVAR): TSO not found");
3483     }
3484
3485   case BlockedOnBlackHole:
3486     {
3487       last = &blackhole_queue;
3488       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3489            last = &t->link, t = t->link) {
3490         if (t == tso) {
3491           *last = tso->link;
3492           goto done;
3493         }
3494       }
3495       barf("unblockThread (BLACKHOLE): TSO not found");
3496     }
3497
3498   case BlockedOnException:
3499     {
3500       StgTSO *target  = tso->block_info.tso;
3501
3502       ASSERT(get_itbl(target)->type == TSO);
3503
3504       while (target->what_next == ThreadRelocated) {
3505           target = target->link;
3506           ASSERT(get_itbl(target)->type == TSO);
3507       }
3508       
3509       ASSERT(target->blocked_exceptions != NULL);
3510
3511       last = &target->blocked_exceptions;
3512       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3513            last = &t->link, t = t->link) {
3514         ASSERT(get_itbl(t)->type == TSO);
3515         if (t == tso) {
3516           *last = tso->link;
3517           goto done;
3518         }
3519       }
3520       barf("unblockThread (Exception): TSO not found");
3521     }
3522
3523 #if !defined(THREADED_RTS)
3524   case BlockedOnRead:
3525   case BlockedOnWrite:
3526 #if defined(mingw32_HOST_OS)
3527   case BlockedOnDoProc:
3528 #endif
3529     {
3530       StgTSO *prev = NULL;
3531       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3532            prev = t, t = t->link) {
3533         if (t == tso) {
3534           if (prev == NULL) {
3535             blocked_queue_hd = t->link;
3536             if (blocked_queue_tl == t) {
3537               blocked_queue_tl = END_TSO_QUEUE;
3538             }
3539           } else {
3540             prev->link = t->link;
3541             if (blocked_queue_tl == t) {
3542               blocked_queue_tl = prev;
3543             }
3544           }
3545 #if defined(mingw32_HOST_OS)
3546           /* (Cooperatively) signal that the worker thread should abort
3547            * the request.
3548            */
3549           abandonWorkRequest(tso->block_info.async_result->reqID);
3550 #endif
3551           goto done;
3552         }
3553       }
3554       barf("unblockThread (I/O): TSO not found");
3555     }
3556
3557   case BlockedOnDelay:
3558     {
3559       StgTSO *prev = NULL;
3560       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3561            prev = t, t = t->link) {
3562         if (t == tso) {
3563           if (prev == NULL) {
3564             sleeping_queue = t->link;
3565           } else {
3566             prev->link = t->link;
3567           }
3568           goto done;
3569         }
3570       }
3571       barf("unblockThread (delay): TSO not found");
3572     }
3573 #endif
3574
3575   default:
3576     barf("unblockThread");
3577   }
3578
3579  done:
3580   tso->link = END_TSO_QUEUE;
3581   tso->why_blocked = NotBlocked;
3582   tso->block_info.closure = NULL;
3583   appendToRunQueue(cap,tso);
3584 }
3585 #endif
3586
3587 /* -----------------------------------------------------------------------------
3588  * checkBlackHoles()
3589  *
3590  * Check the blackhole_queue for threads that can be woken up.  We do
3591  * this periodically: before every GC, and whenever the run queue is
3592  * empty.
3593  *
3594  * An elegant solution might be to just wake up all the blocked
3595  * threads with awakenBlockedQueue occasionally: they'll go back to
3596  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3597  * doesn't give us a way to tell whether we've actually managed to
3598  * wake up any threads, so we would be busy-waiting.
3599  *
3600  * -------------------------------------------------------------------------- */
3601
3602 static rtsBool
3603 checkBlackHoles (Capability *cap)
3604 {
3605     StgTSO **prev, *t;
3606     rtsBool any_woke_up = rtsFalse;
3607     StgHalfWord type;
3608
3609     // blackhole_queue is global:
3610     ASSERT_LOCK_HELD(&sched_mutex);
3611
3612     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3613
3614     // ASSUMES: sched_mutex
3615     prev = &blackhole_queue;
3616     t = blackhole_queue;
3617     while (t != END_TSO_QUEUE) {
3618         ASSERT(t->why_blocked == BlockedOnBlackHole);
3619         type = get_itbl(t->block_info.closure)->type;
3620         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3621             IF_DEBUG(sanity,checkTSO(t));
3622             t = unblockOne(cap, t);
3623             // urk, the threads migrate to the current capability
3624             // here, but we'd like to keep them on the original one.
3625             *prev = t;
3626             any_woke_up = rtsTrue;
3627         } else {
3628             prev = &t->link;
3629             t = t->link;
3630         }
3631     }
3632
3633     return any_woke_up;
3634 }
3635
3636 /* -----------------------------------------------------------------------------
3637  * raiseAsync()
3638  *
3639  * The following function implements the magic for raising an
3640  * asynchronous exception in an existing thread.
3641  *
3642  * We first remove the thread from any queue on which it might be
3643  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3644  *
3645  * We strip the stack down to the innermost CATCH_FRAME, building
3646  * thunks in the heap for all the active computations, so they can 
3647  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3648  * an application of the handler to the exception, and push it on
3649  * the top of the stack.
3650  * 
3651  * How exactly do we save all the active computations?  We create an
3652  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3653  * AP_STACKs pushes everything from the corresponding update frame
3654  * upwards onto the stack.  (Actually, it pushes everything up to the
3655  * next update frame plus a pointer to the next AP_STACK object.
3656  * Entering the next AP_STACK object pushes more onto the stack until we
3657  * reach the last AP_STACK object - at which point the stack should look
3658  * exactly as it did when we killed the TSO and we can continue
3659  * execution by entering the closure on top of the stack.
3660  *
3661  * We can also kill a thread entirely - this happens if either (a) the 
3662  * exception passed to raiseAsync is NULL, or (b) there's no
3663  * CATCH_FRAME on the stack.  In either case, we strip the entire
3664  * stack and replace the thread with a zombie.
3665  *
3666  * ToDo: in THREADED_RTS mode, this function is only safe if either
3667  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3668  * one Capability), or (b) we own the Capability that the TSO is
3669  * currently blocked on or on the run queue of.
3670  *
3671  * -------------------------------------------------------------------------- */
3672  
3673 void
3674 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3675 {
3676     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3677 }
3678
3679 void
3680 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3681 {
3682     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3683 }
3684
3685 static void
3686 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3687             rtsBool stop_at_atomically, StgPtr stop_here)
3688 {
3689     StgRetInfoTable *info;
3690     StgPtr sp, frame;
3691     nat i;
3692   
3693     // Thread already dead?
3694     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3695         return;
3696     }
3697
3698     IF_DEBUG(scheduler, 
3699              sched_belch("raising exception in thread %ld.", (long)tso->id));
3700     
3701     // Remove it from any blocking queues
3702     unblockThread(cap,tso);
3703
3704     // mark it dirty; we're about to change its stack.
3705     dirtyTSO(tso);
3706
3707     sp = tso->sp;
3708     
3709     // The stack freezing code assumes there's a closure pointer on
3710     // the top of the stack, so we have to arrange that this is the case...
3711     //
3712     if (sp[0] == (W_)&stg_enter_info) {
3713         sp++;
3714     } else {
3715         sp--;
3716         sp[0] = (W_)&stg_dummy_ret_closure;
3717     }
3718
3719     frame = sp + 1;
3720     while (stop_here == NULL || frame < stop_here) {
3721
3722         // 1. Let the top of the stack be the "current closure"
3723         //
3724         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3725         // CATCH_FRAME.
3726         //
3727         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3728         // current closure applied to the chunk of stack up to (but not
3729         // including) the update frame.  This closure becomes the "current
3730         // closure".  Go back to step 2.
3731         //
3732         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3733         // top of the stack applied to the exception.
3734         // 
3735         // 5. If it's a STOP_FRAME, then kill the thread.
3736         // 
3737         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3738         // transaction
3739        
3740         info = get_ret_itbl((StgClosure *)frame);
3741
3742         switch (info->i.type) {
3743
3744         case UPDATE_FRAME:
3745         {
3746             StgAP_STACK * ap;
3747             nat words;
3748             
3749             // First build an AP_STACK consisting of the stack chunk above the
3750             // current update frame, with the top word on the stack as the
3751             // fun field.
3752             //
3753             words = frame - sp - 1;
3754             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3755             
3756             ap->size = words;
3757             ap->fun  = (StgClosure *)sp[0];
3758             sp++;
3759             for(i=0; i < (nat)words; ++i) {
3760                 ap->payload[i] = (StgClosure *)*sp++;
3761             }
3762             
3763             SET_HDR(ap,&stg_AP_STACK_info,
3764                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3765             TICK_ALLOC_UP_THK(words+1,0);
3766             
3767             IF_DEBUG(scheduler,
3768                      debugBelch("sched: Updating ");
3769                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3770                      debugBelch(" with ");
3771                      printObj((StgClosure *)ap);
3772                 );
3773
3774             // Replace the updatee with an indirection
3775             //
3776             // Warning: if we're in a loop, more than one update frame on
3777             // the stack may point to the same object.  Be careful not to
3778             // overwrite an IND_OLDGEN in this case, because we'll screw
3779             // up the mutable lists.  To be on the safe side, don't
3780             // overwrite any kind of indirection at all.  See also
3781             // threadSqueezeStack in GC.c, where we have to make a similar
3782             // check.
3783             //
3784             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3785                 // revert the black hole
3786                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3787                                (StgClosure *)ap);
3788             }
3789             sp += sizeofW(StgUpdateFrame) - 1;
3790             sp[0] = (W_)ap; // push onto stack
3791             frame = sp + 1;
3792             continue; //no need to bump frame
3793         }
3794
3795         case STOP_FRAME:
3796             // We've stripped the entire stack, the thread is now dead.
3797             tso->what_next = ThreadKilled;
3798             tso->sp = frame + sizeofW(StgStopFrame);
3799             return;
3800
3801         case CATCH_FRAME:
3802             // If we find a CATCH_FRAME, and we've got an exception to raise,
3803             // then build the THUNK raise(exception), and leave it on
3804             // top of the CATCH_FRAME ready to enter.
3805             //
3806         {
3807 #ifdef PROFILING
3808             StgCatchFrame *cf = (StgCatchFrame *)frame;
3809 #endif
3810             StgThunk *raise;
3811             
3812             if (exception == NULL) break;
3813
3814             // we've got an exception to raise, so let's pass it to the
3815             // handler in this frame.
3816             //
3817             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3818             TICK_ALLOC_SE_THK(1,0);
3819             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3820             raise->payload[0] = exception;
3821             
3822             // throw away the stack from Sp up to the CATCH_FRAME.
3823             //
3824             sp = frame - 1;
3825             
3826             /* Ensure that async excpetions are blocked now, so we don't get
3827              * a surprise exception before we get around to executing the
3828              * handler.
3829              */
3830             if (tso->blocked_exceptions == NULL) {
3831                 tso->blocked_exceptions = END_TSO_QUEUE;
3832             }
3833
3834             /* Put the newly-built THUNK on top of the stack, ready to execute
3835              * when the thread restarts.
3836              */
3837             sp[0] = (W_)raise;
3838             sp[-1] = (W_)&stg_enter_info;
3839             tso->sp = sp-1;
3840             tso->what_next = ThreadRunGHC;
3841             IF_DEBUG(sanity, checkTSO(tso));
3842             return;
3843         }
3844             
3845         case ATOMICALLY_FRAME:
3846             if (stop_at_atomically) {
3847                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3848                 stmCondemnTransaction(cap, tso -> trec);
3849 #ifdef REG_R1
3850                 tso->sp = frame;
3851 #else
3852                 // R1 is not a register: the return convention for IO in
3853                 // this case puts the return value on the stack, so we
3854                 // need to set up the stack to return to the atomically
3855                 // frame properly...
3856                 tso->sp = frame - 2;
3857                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3858                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3859 #endif
3860                 tso->what_next = ThreadRunGHC;
3861                 return;
3862             }
3863             // Not stop_at_atomically... fall through and abort the
3864             // transaction.
3865             
3866         case CATCH_RETRY_FRAME:
3867             // IF we find an ATOMICALLY_FRAME then we abort the
3868             // current transaction and propagate the exception.  In
3869             // this case (unlike ordinary exceptions) we do not care
3870             // whether the transaction is valid or not because its
3871             // possible validity cannot have caused the exception
3872             // and will not be visible after the abort.
3873             IF_DEBUG(stm,
3874                      debugBelch("Found atomically block delivering async exception\n"));
3875             StgTRecHeader *trec = tso -> trec;
3876             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3877             stmAbortTransaction(cap, trec);
3878             tso -> trec = outer;
3879             break;
3880             
3881         default:
3882             break;
3883         }
3884
3885         // move on to the next stack frame
3886         frame += stack_frame_sizeW((StgClosure *)frame);
3887     }
3888
3889     // if we got here, then we stopped at stop_here
3890     ASSERT(stop_here != NULL);
3891 }
3892
3893 /* -----------------------------------------------------------------------------
3894    Deleting threads
3895
3896    This is used for interruption (^C) and forking, and corresponds to
3897    raising an exception but without letting the thread catch the
3898    exception.
3899    -------------------------------------------------------------------------- */
3900
3901 static void 
3902 deleteThread (Capability *cap, StgTSO *tso)
3903 {
3904   if (tso->why_blocked != BlockedOnCCall &&
3905       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3906       raiseAsync(cap,tso,NULL);
3907   }
3908 }
3909
3910 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3911 static void 
3912 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3913 { // for forkProcess only:
3914   // delete thread without giving it a chance to catch the KillThread exception
3915
3916   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3917       return;
3918   }
3919
3920   if (tso->why_blocked != BlockedOnCCall &&
3921       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3922       unblockThread(cap,tso);
3923   }
3924
3925   tso->what_next = ThreadKilled;
3926 }
3927 #endif
3928
3929 /* -----------------------------------------------------------------------------
3930    raiseExceptionHelper
3931    
3932    This function is called by the raise# primitve, just so that we can
3933    move some of the tricky bits of raising an exception from C-- into
3934    C.  Who knows, it might be a useful re-useable thing here too.
3935    -------------------------------------------------------------------------- */
3936
3937 StgWord
3938 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3939 {
3940     Capability *cap = regTableToCapability(reg);
3941     StgThunk *raise_closure = NULL;
3942     StgPtr p, next;
3943     StgRetInfoTable *info;
3944     //
3945     // This closure represents the expression 'raise# E' where E
3946     // is the exception raise.  It is used to overwrite all the
3947     // thunks which are currently under evaluataion.
3948     //
3949
3950     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3951     // LDV profiling: stg_raise_info has THUNK as its closure
3952     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3953     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3954     // 1 does not cause any problem unless profiling is performed.
3955     // However, when LDV profiling goes on, we need to linearly scan
3956     // small object pool, where raise_closure is stored, so we should
3957     // use MIN_UPD_SIZE.
3958     //
3959     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3960     //                                 sizeofW(StgClosure)+1);
3961     //
3962
3963     //
3964     // Walk up the stack, looking for the catch frame.  On the way,
3965     // we update any closures pointed to from update frames with the
3966     // raise closure that we just built.
3967     //
3968     p = tso->sp;
3969     while(1) {
3970         info = get_ret_itbl((StgClosure *)p);
3971         next = p + stack_frame_sizeW((StgClosure *)p);
3972         switch (info->i.type) {
3973             
3974         case UPDATE_FRAME:
3975             // Only create raise_closure if we need to.
3976             if (raise_closure == NULL) {
3977                 raise_closure = 
3978                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3979                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3980                 raise_closure->payload[0] = exception;
3981             }
3982             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3983             p = next;
3984             continue;
3985
3986         case ATOMICALLY_FRAME:
3987             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3988             tso->sp = p;
3989             return ATOMICALLY_FRAME;
3990             
3991         case CATCH_FRAME:
3992             tso->sp = p;
3993             return CATCH_FRAME;
3994
3995         case CATCH_STM_FRAME:
3996             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3997             tso->sp = p;
3998             return CATCH_STM_FRAME;
3999             
4000         case STOP_FRAME:
4001             tso->sp = p;
4002             return STOP_FRAME;
4003
4004         case CATCH_RETRY_FRAME:
4005         default:
4006             p = next; 
4007             continue;
4008         }
4009     }
4010 }
4011
4012
4013 /* -----------------------------------------------------------------------------
4014    findRetryFrameHelper
4015
4016    This function is called by the retry# primitive.  It traverses the stack
4017    leaving tso->sp referring to the frame which should handle the retry.  
4018
4019    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4020    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4021
4022    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4023    despite the similar implementation.
4024
4025    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4026    not be created within memory transactions.
4027    -------------------------------------------------------------------------- */
4028
4029 StgWord
4030 findRetryFrameHelper (StgTSO *tso)
4031 {
4032   StgPtr           p, next;
4033   StgRetInfoTable *info;
4034
4035   p = tso -> sp;
4036   while (1) {
4037     info = get_ret_itbl((StgClosure *)p);
4038     next = p + stack_frame_sizeW((StgClosure *)p);
4039     switch (info->i.type) {
4040       
4041     case ATOMICALLY_FRAME:
4042       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4043       tso->sp = p;
4044       return ATOMICALLY_FRAME;
4045       
4046     case CATCH_RETRY_FRAME:
4047       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4048       tso->sp = p;
4049       return CATCH_RETRY_FRAME;
4050       
4051     case CATCH_STM_FRAME:
4052     default:
4053       ASSERT(info->i.type != CATCH_FRAME);
4054       ASSERT(info->i.type != STOP_FRAME);
4055       p = next; 
4056       continue;
4057     }
4058   }
4059 }
4060
4061 /* -----------------------------------------------------------------------------
4062    resurrectThreads is called after garbage collection on the list of
4063    threads found to be garbage.  Each of these threads will be woken
4064    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4065    on an MVar, or NonTermination if the thread was blocked on a Black
4066    Hole.
4067
4068    Locks: assumes we hold *all* the capabilities.
4069    -------------------------------------------------------------------------- */
4070
4071 void
4072 resurrectThreads (StgTSO *threads)
4073 {
4074     StgTSO *tso, *next;
4075     Capability *cap;
4076
4077     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4078         next = tso->global_link;
4079         tso->global_link = all_threads;
4080         all_threads = tso;
4081         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4082         
4083         // Wake up the thread on the Capability it was last on for a
4084         // bound thread, or last_free_capability otherwise.
4085         if (tso->bound) {
4086             cap = tso->bound->cap;
4087         } else {
4088             cap = last_free_capability;
4089         }
4090         
4091         switch (tso->why_blocked) {
4092         case BlockedOnMVar:
4093         case BlockedOnException:
4094             /* Called by GC - sched_mutex lock is currently held. */
4095             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4096             break;
4097         case BlockedOnBlackHole:
4098             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4099             break;
4100         case BlockedOnSTM:
4101             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4102             break;
4103         case NotBlocked:
4104             /* This might happen if the thread was blocked on a black hole
4105              * belonging to a thread that we've just woken up (raiseAsync
4106              * can wake up threads, remember...).
4107              */
4108             continue;
4109         default:
4110             barf("resurrectThreads: thread blocked in a strange way");
4111         }
4112     }
4113 }
4114
4115 /* ----------------------------------------------------------------------------
4116  * Debugging: why is a thread blocked
4117  * [Also provides useful information when debugging threaded programs
4118  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4119    ------------------------------------------------------------------------- */
4120
4121 #if DEBUG
4122 static void
4123 printThreadBlockage(StgTSO *tso)
4124 {
4125   switch (tso->why_blocked) {
4126   case BlockedOnRead:
4127     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4128     break;
4129   case BlockedOnWrite:
4130     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4131     break;
4132 #if defined(mingw32_HOST_OS)
4133     case BlockedOnDoProc:
4134     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4135     break;
4136 #endif
4137   case BlockedOnDelay:
4138     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4139     break;
4140   case BlockedOnMVar:
4141     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4142     break;
4143   case BlockedOnException:
4144     debugBelch("is blocked on delivering an exception to thread %d",
4145             tso->block_info.tso->id);
4146     break;
4147   case BlockedOnBlackHole:
4148     debugBelch("is blocked on a black hole");
4149     break;
4150   case NotBlocked:
4151     debugBelch("is not blocked");
4152     break;
4153 #if defined(PARALLEL_HASKELL)
4154   case BlockedOnGA:
4155     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4156             tso->block_info.closure, info_type(tso->block_info.closure));
4157     break;
4158   case BlockedOnGA_NoSend:
4159     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4160             tso->block_info.closure, info_type(tso->block_info.closure));
4161     break;
4162 #endif
4163   case BlockedOnCCall:
4164     debugBelch("is blocked on an external call");
4165     break;
4166   case BlockedOnCCall_NoUnblockExc:
4167     debugBelch("is blocked on an external call (exceptions were already blocked)");
4168     break;
4169   case BlockedOnSTM:
4170     debugBelch("is blocked on an STM operation");
4171     break;
4172   default:
4173     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4174          tso->why_blocked, tso->id, tso);
4175   }
4176 }
4177
4178 void
4179 printThreadStatus(StgTSO *t)
4180 {
4181     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4182     {
4183       void *label = lookupThreadLabel(t->id);
4184       if (label) debugBelch("[\"%s\"] ",(char *)label);
4185     }
4186     if (t->what_next == ThreadRelocated) {
4187         debugBelch("has been relocated...\n");
4188     } else {
4189         switch (t->what_next) {
4190         case ThreadKilled:
4191             debugBelch("has been killed");
4192             break;
4193         case ThreadComplete:
4194             debugBelch("has completed");
4195             break;
4196         default:
4197             printThreadBlockage(t);
4198         }
4199         debugBelch("\n");
4200     }
4201 }
4202
4203 void
4204 printAllThreads(void)
4205 {
4206   StgTSO *t, *next;
4207   nat i;
4208   Capability *cap;
4209
4210 # if defined(GRAN)
4211   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4212   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4213                        time_string, rtsFalse/*no commas!*/);
4214
4215   debugBelch("all threads at [%s]:\n", time_string);
4216 # elif defined(PARALLEL_HASKELL)
4217   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4218   ullong_format_string(CURRENT_TIME,
4219                        time_string, rtsFalse/*no commas!*/);
4220
4221   debugBelch("all threads at [%s]:\n", time_string);
4222 # else
4223   debugBelch("all threads:\n");
4224 # endif
4225
4226   for (i = 0; i < n_capabilities; i++) {
4227       cap = &capabilities[i];
4228       debugBelch("threads on capability %d:\n", cap->no);
4229       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4230           printThreadStatus(t);
4231       }
4232   }
4233
4234   debugBelch("other threads:\n");
4235   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4236       if (t->why_blocked != NotBlocked) {
4237           printThreadStatus(t);
4238       }
4239       if (t->what_next == ThreadRelocated) {
4240           next = t->link;
4241       } else {
4242           next = t->global_link;
4243       }
4244   }
4245 }
4246
4247 // useful from gdb
4248 void 
4249 printThreadQueue(StgTSO *t)
4250 {
4251     nat i = 0;
4252     for (; t != END_TSO_QUEUE; t = t->link) {
4253         printThreadStatus(t);
4254         i++;
4255     }
4256     debugBelch("%d threads on queue\n", i);
4257 }
4258
4259 /* 
4260    Print a whole blocking queue attached to node (debugging only).
4261 */
4262 # if defined(PARALLEL_HASKELL)
4263 void 
4264 print_bq (StgClosure *node)
4265 {
4266   StgBlockingQueueElement *bqe;
4267   StgTSO *tso;
4268   rtsBool end;
4269
4270   debugBelch("## BQ of closure %p (%s): ",
4271           node, info_type(node));
4272
4273   /* should cover all closures that may have a blocking queue */
4274   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4275          get_itbl(node)->type == FETCH_ME_BQ ||
4276          get_itbl(node)->type == RBH ||
4277          get_itbl(node)->type == MVAR);
4278     
4279   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4280
4281   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4282 }
4283
4284 /* 
4285    Print a whole blocking queue starting with the element bqe.
4286 */
4287 void 
4288 print_bqe (StgBlockingQueueElement *bqe)
4289 {
4290   rtsBool end;
4291
4292   /* 
4293      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4294   */
4295   for (end = (bqe==END_BQ_QUEUE);
4296        !end; // iterate until bqe points to a CONSTR
4297        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4298        bqe = end ? END_BQ_QUEUE : bqe->link) {
4299     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4300     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4301     /* types of closures that may appear in a blocking queue */
4302     ASSERT(get_itbl(bqe)->type == TSO ||           
4303            get_itbl(bqe)->type == BLOCKED_FETCH || 
4304            get_itbl(bqe)->type == CONSTR); 
4305     /* only BQs of an RBH end with an RBH_Save closure */
4306     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4307
4308     switch (get_itbl(bqe)->type) {
4309     case TSO:
4310       debugBelch(" TSO %u (%x),",
4311               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4312       break;
4313     case BLOCKED_FETCH:
4314       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4315               ((StgBlockedFetch *)bqe)->node, 
4316               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4317               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4318               ((StgBlockedFetch *)bqe)->ga.weight);
4319       break;
4320     case CONSTR:
4321       debugBelch(" %s (IP %p),",
4322               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4323                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4324                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4325                "RBH_Save_?"), get_itbl(bqe));
4326       break;
4327     default:
4328       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4329            info_type((StgClosure *)bqe)); // , node, info_type(node));
4330       break;
4331     }
4332   } /* for */
4333   debugBelch("\n");
4334 }
4335 # elif defined(GRAN)
4336 void 
4337 print_bq (StgClosure *node)
4338 {
4339   StgBlockingQueueElement *bqe;
4340   PEs node_loc, tso_loc;
4341   rtsBool end;
4342
4343   /* should cover all closures that may have a blocking queue */
4344   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4345          get_itbl(node)->type == FETCH_ME_BQ ||
4346          get_itbl(node)->type == RBH);
4347     
4348   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4349   node_loc = where_is(node);
4350
4351   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4352           node, info_type(node), node_loc);
4353
4354   /* 
4355      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4356   */
4357   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4358        !end; // iterate until bqe points to a CONSTR
4359        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4360     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4361     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4362     /* types of closures that may appear in a blocking queue */
4363     ASSERT(get_itbl(bqe)->type == TSO ||           
4364            get_itbl(bqe)->type == CONSTR); 
4365     /* only BQs of an RBH end with an RBH_Save closure */
4366     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4367
4368     tso_loc = where_is((StgClosure *)bqe);
4369     switch (get_itbl(bqe)->type) {
4370     case TSO:
4371       debugBelch(" TSO %d (%p) on [PE %d],",
4372               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4373       break;
4374     case CONSTR:
4375       debugBelch(" %s (IP %p),",
4376               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4377                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4378                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4379                "RBH_Save_?"), get_itbl(bqe));
4380       break;
4381     default:
4382       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4383            info_type((StgClosure *)bqe), node, info_type(node));
4384       break;
4385     }
4386   } /* for */
4387   debugBelch("\n");
4388 }
4389 # endif
4390
4391 #if defined(PARALLEL_HASKELL)
4392 static nat
4393 run_queue_len(void)
4394 {
4395     nat i;
4396     StgTSO *tso;
4397     
4398     for (i=0, tso=run_queue_hd; 
4399          tso != END_TSO_QUEUE;
4400          i++, tso=tso->link) {
4401         /* nothing */
4402     }
4403         
4404     return i;
4405 }
4406 #endif
4407
4408 void
4409 sched_belch(char *s, ...)
4410 {
4411     va_list ap;
4412     va_start(ap,s);
4413 #ifdef THREADED_RTS
4414     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4415 #elif defined(PARALLEL_HASKELL)
4416     debugBelch("== ");
4417 #else
4418     debugBelch("sched: ");
4419 #endif
4420     vdebugBelch(s, ap);
4421     debugBelch("\n");
4422     va_end(ap);
4423 }
4424
4425 #endif /* DEBUG */