6fa21b7549c0a7489d39fdf1092ab7d14c1672f2
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
231                          void (*get_roots)(evac_fn));
232
233 static void unblockThread(Capability *cap, StgTSO *tso);
234 static rtsBool checkBlackHoles(Capability *cap);
235 static void AllRoots(evac_fn evac);
236
237 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238
239 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
240                         rtsBool stop_at_atomically, StgPtr stop_here);
241
242 static void deleteThread (Capability *cap, StgTSO *tso);
243 static void deleteRunQueue (Capability *cap);
244
245 #ifdef DEBUG
246 static void printThreadBlockage(StgTSO *tso);
247 static void printThreadStatus(StgTSO *tso);
248 void printThreadQueue(StgTSO *tso);
249 #endif
250
251 #if defined(PARALLEL_HASKELL)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 #ifdef DEBUG
257 static char *whatNext_strs[] = {
258   "(unknown)",
259   "ThreadRunGHC",
260   "ThreadInterpret",
261   "ThreadKilled",
262   "ThreadRelocated",
263   "ThreadComplete"
264 };
265 #endif
266
267 /* -----------------------------------------------------------------------------
268  * Putting a thread on the run queue: different scheduling policies
269  * -------------------------------------------------------------------------- */
270
271 STATIC_INLINE void
272 addToRunQueue( Capability *cap, StgTSO *t )
273 {
274 #if defined(PARALLEL_HASKELL)
275     if (RtsFlags.ParFlags.doFairScheduling) { 
276         // this does round-robin scheduling; good for concurrency
277         appendToRunQueue(cap,t);
278     } else {
279         // this does unfair scheduling; good for parallelism
280         pushOnRunQueue(cap,t);
281     }
282 #else
283     // this does round-robin scheduling; good for concurrency
284     appendToRunQueue(cap,t);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    GRAN version:
301      In a GranSim setup this loop iterates over the global event queue.
302      This revolves around the global event queue, which determines what 
303      to do next. Therefore, it's more complicated than either the 
304      concurrent or the parallel (GUM) setup.
305
306    GUM version:
307      GUM iterates over incoming messages.
308      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
309      and sends out a fish whenever it has nothing to do; in-between
310      doing the actual reductions (shared code below) it processes the
311      incoming messages and deals with delayed operations 
312      (see PendingFetches).
313      This is not the ugliest code you could imagine, but it's bloody close.
314
315    ------------------------------------------------------------------------ */
316
317 static Capability *
318 schedule (Capability *initialCapability, Task *task)
319 {
320   StgTSO *t;
321   Capability *cap;
322   StgThreadReturnCode ret;
323 #if defined(GRAN)
324   rtsEvent *event;
325 #elif defined(PARALLEL_HASKELL)
326   StgTSO *tso;
327   GlobalTaskId pe;
328   rtsBool receivedFinish = rtsFalse;
329 # if defined(DEBUG)
330   nat tp_size, sp_size; // stats only
331 # endif
332 #endif
333   nat prev_what_next;
334   rtsBool ready_to_gc;
335 #if defined(THREADED_RTS)
336   rtsBool first = rtsTrue;
337 #endif
338   
339   cap = initialCapability;
340
341   // Pre-condition: this task owns initialCapability.
342   // The sched_mutex is *NOT* held
343   // NB. on return, we still hold a capability.
344
345   IF_DEBUG(scheduler,
346            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
347                        task, initialCapability);
348       );
349
350   schedulePreLoop();
351
352   // -----------------------------------------------------------
353   // Scheduler loop starts here:
354
355 #if defined(PARALLEL_HASKELL)
356 #define TERMINATION_CONDITION        (!receivedFinish)
357 #elif defined(GRAN)
358 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
359 #else
360 #define TERMINATION_CONDITION        rtsTrue
361 #endif
362
363   while (TERMINATION_CONDITION) {
364
365 #if defined(GRAN)
366       /* Choose the processor with the next event */
367       CurrentProc = event->proc;
368       CurrentTSO = event->tso;
369 #endif
370
371 #if defined(THREADED_RTS)
372       if (first) {
373           // don't yield the first time, we want a chance to run this
374           // thread for a bit, even if there are others banging at the
375           // door.
376           first = rtsFalse;
377           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378       } else {
379           // Yield the capability to higher-priority tasks if necessary.
380           yieldCapability(&cap, task);
381       }
382 #endif
383       
384 #if defined(THREADED_RTS)
385       schedulePushWork(cap,task);
386 #endif
387
388     // Check whether we have re-entered the RTS from Haskell without
389     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390     // call).
391     if (cap->in_haskell) {
392           errorBelch("schedule: re-entered unsafely.\n"
393                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
394           stg_exit(EXIT_FAILURE);
395     }
396
397     //
398     // Test for interruption.  If interrupted==rtsTrue, then either
399     // we received a keyboard interrupt (^C), or the scheduler is
400     // trying to shut down all the tasks (shutting_down_scheduler) in
401     // the threaded RTS.
402     //
403     if (interrupted) {
404         deleteRunQueue(cap);
405 #if defined(THREADED_RTS)
406         discardSparksCap(cap);
407 #endif
408         if (shutting_down_scheduler) {
409             IF_DEBUG(scheduler, sched_belch("shutting down"));
410             // If we are a worker, just exit.  If we're a bound thread
411             // then we will exit below when we've removed our TSO from
412             // the run queue.
413             if (task->tso == NULL && emptyRunQueue(cap)) {
414                 return cap;
415             }
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(THREADED_RTS)
422     // If the run queue is empty, take a spark and turn it into a thread.
423     {
424         if (emptyRunQueue(cap)) {
425             StgClosure *spark;
426             spark = findSpark(cap);
427             if (spark != NULL) {
428                 IF_DEBUG(scheduler,
429                          sched_belch("turning spark of closure %p into a thread",
430                                      (StgClosure *)spark));
431                 createSparkThread(cap,spark);     
432             }
433         }
434     }
435 #endif // THREADED_RTS
436
437     scheduleStartSignalHandlers(cap);
438
439     // Only check the black holes here if we've nothing else to do.
440     // During normal execution, the black hole list only gets checked
441     // at GC time, to avoid repeatedly traversing this possibly long
442     // list each time around the scheduler.
443     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444
445     scheduleCheckBlockedThreads(cap);
446
447     scheduleDetectDeadlock(cap,task);
448 #if defined(THREADED_RTS)
449     cap = task->cap;    // reload cap, it might have changed
450 #endif
451
452     // Normally, the only way we can get here with no threads to
453     // run is if a keyboard interrupt received during 
454     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
455     // Additionally, it is not fatal for the
456     // threaded RTS to reach here with no threads to run.
457     //
458     // win32: might be here due to awaitEvent() being abandoned
459     // as a result of a console event having been delivered.
460     if ( emptyRunQueue(cap) ) {
461 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
462         ASSERT(interrupted);
463 #endif
464         continue; // nothing to do
465     }
466
467 #if defined(PARALLEL_HASKELL)
468     scheduleSendPendingMessages();
469     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
470         continue;
471
472 #if defined(SPARKS)
473     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
474 #endif
475
476     /* If we still have no work we need to send a FISH to get a spark
477        from another PE */
478     if (emptyRunQueue(cap)) {
479         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
480         ASSERT(rtsFalse); // should not happen at the moment
481     }
482     // from here: non-empty run queue.
483     //  TODO: merge above case with this, only one call processMessages() !
484     if (PacketsWaiting()) {  /* process incoming messages, if
485                                 any pending...  only in else
486                                 because getRemoteWork waits for
487                                 messages as well */
488         receivedFinish = processMessages();
489     }
490 #endif
491
492 #if defined(GRAN)
493     scheduleProcessEvent(event);
494 #endif
495
496     // 
497     // Get a thread to run
498     //
499     t = popRunQueue(cap);
500
501 #if defined(GRAN) || defined(PAR)
502     scheduleGranParReport(); // some kind of debuging output
503 #else
504     // Sanity check the thread we're about to run.  This can be
505     // expensive if there is lots of thread switching going on...
506     IF_DEBUG(sanity,checkTSO(t));
507 #endif
508
509 #if defined(THREADED_RTS)
510     // Check whether we can run this thread in the current task.
511     // If not, we have to pass our capability to the right task.
512     {
513         Task *bound = t->bound;
514       
515         if (bound) {
516             if (bound == task) {
517                 IF_DEBUG(scheduler,
518                          sched_belch("### Running thread %d in bound thread",
519                                      t->id));
520                 // yes, the Haskell thread is bound to the current native thread
521             } else {
522                 IF_DEBUG(scheduler,
523                          sched_belch("### thread %d bound to another OS thread",
524                                      t->id));
525                 // no, bound to a different Haskell thread: pass to that thread
526                 pushOnRunQueue(cap,t);
527                 continue;
528             }
529         } else {
530             // The thread we want to run is unbound.
531             if (task->tso) { 
532                 IF_DEBUG(scheduler,
533                          sched_belch("### this OS thread cannot run thread %d", t->id));
534                 // no, the current native thread is bound to a different
535                 // Haskell thread, so pass it to any worker thread
536                 pushOnRunQueue(cap,t);
537                 continue; 
538             }
539         }
540     }
541 #endif
542
543     cap->r.rCurrentTSO = t;
544     
545     /* context switches are initiated by the timer signal, unless
546      * the user specified "context switch as often as possible", with
547      * +RTS -C0
548      */
549     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
550         && !emptyThreadQueues(cap)) {
551         context_switch = 1;
552     }
553          
554 run_thread:
555
556     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
557                               (long)t->id, whatNext_strs[t->what_next]));
558
559 #if defined(PROFILING)
560     startHeapProfTimer();
561 #endif
562
563     // ----------------------------------------------------------------------
564     // Run the current thread 
565
566     prev_what_next = t->what_next;
567
568     errno = t->saved_errno;
569     cap->in_haskell = rtsTrue;
570
571     dirtyTSO(t);
572
573     recent_activity = ACTIVITY_YES;
574
575     switch (prev_what_next) {
576         
577     case ThreadKilled:
578     case ThreadComplete:
579         /* Thread already finished, return to scheduler. */
580         ret = ThreadFinished;
581         break;
582         
583     case ThreadRunGHC:
584     {
585         StgRegTable *r;
586         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
587         cap = regTableToCapability(r);
588         ret = r->rRet;
589         break;
590     }
591     
592     case ThreadInterpret:
593         cap = interpretBCO(cap);
594         ret = cap->r.rRet;
595         break;
596         
597     default:
598         barf("schedule: invalid what_next field");
599     }
600
601     cap->in_haskell = rtsFalse;
602
603     // The TSO might have moved, eg. if it re-entered the RTS and a GC
604     // happened.  So find the new location:
605     t = cap->r.rCurrentTSO;
606
607     // We have run some Haskell code: there might be blackhole-blocked
608     // threads to wake up now.
609     // Lock-free test here should be ok, we're just setting a flag.
610     if ( blackhole_queue != END_TSO_QUEUE ) {
611         blackholes_need_checking = rtsTrue;
612     }
613     
614     // And save the current errno in this thread.
615     // XXX: possibly bogus for SMP because this thread might already
616     // be running again, see code below.
617     t->saved_errno = errno;
618
619 #ifdef SMP
620     // If ret is ThreadBlocked, and this Task is bound to the TSO that
621     // blocked, we are in limbo - the TSO is now owned by whatever it
622     // is blocked on, and may in fact already have been woken up,
623     // perhaps even on a different Capability.  It may be the case
624     // that task->cap != cap.  We better yield this Capability
625     // immediately and return to normaility.
626     if (ret == ThreadBlocked) {
627         IF_DEBUG(scheduler,
628                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
629                              t->id, whatNext_strs[t->what_next]));
630         continue;
631     }
632 #endif
633
634     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
635
636     // ----------------------------------------------------------------------
637     
638     // Costs for the scheduler are assigned to CCS_SYSTEM
639 #if defined(PROFILING)
640     stopHeapProfTimer();
641     CCCS = CCS_SYSTEM;
642 #endif
643     
644 #if defined(THREADED_RTS)
645     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
646 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
647     IF_DEBUG(scheduler,debugBelch("sched: "););
648 #endif
649     
650     schedulePostRunThread();
651
652     ready_to_gc = rtsFalse;
653
654     switch (ret) {
655     case HeapOverflow:
656         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
657         break;
658
659     case StackOverflow:
660         scheduleHandleStackOverflow(cap,task,t);
661         break;
662
663     case ThreadYielding:
664         if (scheduleHandleYield(cap, t, prev_what_next)) {
665             // shortcut for switching between compiler/interpreter:
666             goto run_thread; 
667         }
668         break;
669
670     case ThreadBlocked:
671         scheduleHandleThreadBlocked(t);
672         break;
673
674     case ThreadFinished:
675         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
676         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
677         break;
678
679     default:
680       barf("schedule: invalid thread return code %d", (int)ret);
681     }
682
683     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
684     if (ready_to_gc) {
685       scheduleDoGC(cap,task,rtsFalse,GetRoots);
686 #if defined(THREADED_RTS)
687       cap = task->cap;    // reload cap, it might have changed  
688 #endif
689     }
690   } /* end of while() */
691
692   IF_PAR_DEBUG(verbose,
693                debugBelch("== Leaving schedule() after having received Finish\n"));
694 }
695
696 /* ----------------------------------------------------------------------------
697  * Setting up the scheduler loop
698  * ------------------------------------------------------------------------- */
699
700 static void
701 schedulePreLoop(void)
702 {
703 #if defined(GRAN) 
704     /* set up first event to get things going */
705     /* ToDo: assign costs for system setup and init MainTSO ! */
706     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
707               ContinueThread, 
708               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
709     
710     IF_DEBUG(gran,
711              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
712                         CurrentTSO);
713              G_TSO(CurrentTSO, 5));
714     
715     if (RtsFlags.GranFlags.Light) {
716         /* Save current time; GranSim Light only */
717         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
718     }      
719 #endif
720 }
721
722 /* -----------------------------------------------------------------------------
723  * schedulePushWork()
724  *
725  * Push work to other Capabilities if we have some.
726  * -------------------------------------------------------------------------- */
727
728 #if defined(THREADED_RTS)
729 static void
730 schedulePushWork(Capability *cap USED_IF_THREADS, 
731                  Task *task      USED_IF_THREADS)
732 {
733     Capability *free_caps[n_capabilities], *cap0;
734     nat i, n_free_caps;
735
736     // Check whether we have more threads on our run queue, or sparks
737     // in our pool, that we could hand to another Capability.
738     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
739         && sparkPoolSizeCap(cap) < 2) {
740         return;
741     }
742
743     // First grab as many free Capabilities as we can.
744     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
745         cap0 = &capabilities[i];
746         if (cap != cap0 && tryGrabCapability(cap0,task)) {
747             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
748                 // it already has some work, we just grabbed it at 
749                 // the wrong moment.  Or maybe it's deadlocked!
750                 releaseCapability(cap0);
751             } else {
752                 free_caps[n_free_caps++] = cap0;
753             }
754         }
755     }
756
757     // we now have n_free_caps free capabilities stashed in
758     // free_caps[].  Share our run queue equally with them.  This is
759     // probably the simplest thing we could do; improvements we might
760     // want to do include:
761     //
762     //   - giving high priority to moving relatively new threads, on 
763     //     the gournds that they haven't had time to build up a
764     //     working set in the cache on this CPU/Capability.
765     //
766     //   - giving low priority to moving long-lived threads
767
768     if (n_free_caps > 0) {
769         StgTSO *prev, *t, *next;
770         rtsBool pushed_to_all;
771
772         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
773
774         i = 0;
775         pushed_to_all = rtsFalse;
776
777         if (cap->run_queue_hd != END_TSO_QUEUE) {
778             prev = cap->run_queue_hd;
779             t = prev->link;
780             prev->link = END_TSO_QUEUE;
781             for (; t != END_TSO_QUEUE; t = next) {
782                 next = t->link;
783                 t->link = END_TSO_QUEUE;
784                 if (t->what_next == ThreadRelocated
785                     || t->bound == task) { // don't move my bound thread
786                     prev->link = t;
787                     prev = t;
788                 } else if (i == n_free_caps) {
789                     pushed_to_all = rtsTrue;
790                     i = 0;
791                     // keep one for us
792                     prev->link = t;
793                     prev = t;
794                 } else {
795                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
796                     appendToRunQueue(free_caps[i],t);
797                     if (t->bound) { t->bound->cap = free_caps[i]; }
798                     i++;
799                 }
800             }
801             cap->run_queue_tl = prev;
802         }
803
804         // If there are some free capabilities that we didn't push any
805         // threads to, then try to push a spark to each one.
806         if (!pushed_to_all) {
807             StgClosure *spark;
808             // i is the next free capability to push to
809             for (; i < n_free_caps; i++) {
810                 if (emptySparkPoolCap(free_caps[i])) {
811                     spark = findSpark(cap);
812                     if (spark != NULL) {
813                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
814                         newSpark(&(free_caps[i]->r), spark);
815                     }
816                 }
817             }
818         }
819
820         // release the capabilities
821         for (i = 0; i < n_free_caps; i++) {
822             task->cap = free_caps[i];
823             releaseCapability(free_caps[i]);
824         }
825     }
826     task->cap = cap; // reset to point to our Capability.
827 }
828 #endif
829
830 /* ----------------------------------------------------------------------------
831  * Start any pending signal handlers
832  * ------------------------------------------------------------------------- */
833
834 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
835 static void
836 scheduleStartSignalHandlers(Capability *cap)
837 {
838     if (signals_pending()) { // safe outside the lock
839         startSignalHandlers(cap);
840     }
841 }
842 #else
843 static void
844 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
845 {
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Check for blocked threads that can be woken up.
851  * ------------------------------------------------------------------------- */
852
853 static void
854 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
855 {
856 #if !defined(THREADED_RTS)
857     //
858     // Check whether any waiting threads need to be woken up.  If the
859     // run queue is empty, and there are no other tasks running, we
860     // can wait indefinitely for something to happen.
861     //
862     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
863     {
864         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
865     }
866 #endif
867 }
868
869
870 /* ----------------------------------------------------------------------------
871  * Check for threads blocked on BLACKHOLEs that can be woken up
872  * ------------------------------------------------------------------------- */
873 static void
874 scheduleCheckBlackHoles (Capability *cap)
875 {
876     if ( blackholes_need_checking ) // check without the lock first
877     {
878         ACQUIRE_LOCK(&sched_mutex);
879         if ( blackholes_need_checking ) {
880             checkBlackHoles(cap);
881             blackholes_need_checking = rtsFalse;
882         }
883         RELEASE_LOCK(&sched_mutex);
884     }
885 }
886
887 /* ----------------------------------------------------------------------------
888  * Detect deadlock conditions and attempt to resolve them.
889  * ------------------------------------------------------------------------- */
890
891 static void
892 scheduleDetectDeadlock (Capability *cap, Task *task)
893 {
894
895 #if defined(PARALLEL_HASKELL)
896     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
897     return;
898 #endif
899
900     /* 
901      * Detect deadlock: when we have no threads to run, there are no
902      * threads blocked, waiting for I/O, or sleeping, and all the
903      * other tasks are waiting for work, we must have a deadlock of
904      * some description.
905      */
906     if ( emptyThreadQueues(cap) )
907     {
908 #if defined(THREADED_RTS)
909         /* 
910          * In the threaded RTS, we only check for deadlock if there
911          * has been no activity in a complete timeslice.  This means
912          * we won't eagerly start a full GC just because we don't have
913          * any threads to run currently.
914          */
915         if (recent_activity != ACTIVITY_INACTIVE) return;
916 #endif
917
918         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
919
920         // Garbage collection can release some new threads due to
921         // either (a) finalizers or (b) threads resurrected because
922         // they are unreachable and will therefore be sent an
923         // exception.  Any threads thus released will be immediately
924         // runnable.
925         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
926 #if defined(THREADED_RTS)
927         cap = task->cap;    // reload cap, it might have changed
928 #endif
929
930         recent_activity = ACTIVITY_DONE_GC;
931         
932         if ( !emptyRunQueue(cap) ) return;
933
934 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
935         /* If we have user-installed signal handlers, then wait
936          * for signals to arrive rather then bombing out with a
937          * deadlock.
938          */
939         if ( anyUserHandlers() ) {
940             IF_DEBUG(scheduler, 
941                      sched_belch("still deadlocked, waiting for signals..."));
942
943             awaitUserSignals();
944
945             if (signals_pending()) {
946                 startSignalHandlers(cap);
947             }
948
949             // either we have threads to run, or we were interrupted:
950             ASSERT(!emptyRunQueue(cap) || interrupted);
951         }
952 #endif
953
954 #if !defined(THREADED_RTS)
955         /* Probably a real deadlock.  Send the current main thread the
956          * Deadlock exception.
957          */
958         if (task->tso) {
959             switch (task->tso->why_blocked) {
960             case BlockedOnSTM:
961             case BlockedOnBlackHole:
962             case BlockedOnException:
963             case BlockedOnMVar:
964                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
965                 return;
966             default:
967                 barf("deadlock: main thread blocked in a strange way");
968             }
969         }
970         return;
971 #endif
972     }
973 }
974
975 /* ----------------------------------------------------------------------------
976  * Process an event (GRAN only)
977  * ------------------------------------------------------------------------- */
978
979 #if defined(GRAN)
980 static StgTSO *
981 scheduleProcessEvent(rtsEvent *event)
982 {
983     StgTSO *t;
984
985     if (RtsFlags.GranFlags.Light)
986       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
987
988     /* adjust time based on time-stamp */
989     if (event->time > CurrentTime[CurrentProc] &&
990         event->evttype != ContinueThread)
991       CurrentTime[CurrentProc] = event->time;
992     
993     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
994     if (!RtsFlags.GranFlags.Light)
995       handleIdlePEs();
996
997     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
998
999     /* main event dispatcher in GranSim */
1000     switch (event->evttype) {
1001       /* Should just be continuing execution */
1002     case ContinueThread:
1003       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1004       /* ToDo: check assertion
1005       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1006              run_queue_hd != END_TSO_QUEUE);
1007       */
1008       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1009       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1010           procStatus[CurrentProc]==Fetching) {
1011         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1012               CurrentTSO->id, CurrentTSO, CurrentProc);
1013         goto next_thread;
1014       } 
1015       /* Ignore ContinueThreads for completed threads */
1016       if (CurrentTSO->what_next == ThreadComplete) {
1017         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1018               CurrentTSO->id, CurrentTSO, CurrentProc);
1019         goto next_thread;
1020       } 
1021       /* Ignore ContinueThreads for threads that are being migrated */
1022       if (PROCS(CurrentTSO)==Nowhere) { 
1023         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1024               CurrentTSO->id, CurrentTSO, CurrentProc);
1025         goto next_thread;
1026       }
1027       /* The thread should be at the beginning of the run queue */
1028       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1029         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1030               CurrentTSO->id, CurrentTSO, CurrentProc);
1031         break; // run the thread anyway
1032       }
1033       /*
1034       new_event(proc, proc, CurrentTime[proc],
1035                 FindWork,
1036                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1037       goto next_thread; 
1038       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1039       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1040
1041     case FetchNode:
1042       do_the_fetchnode(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case GlobalBlock:
1046       do_the_globalblock(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case FetchReply:
1050       do_the_fetchreply(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     case UnblockThread:   /* Move from the blocked queue to the tail of */
1054       do_the_unblock(event);
1055       goto next_thread;             /* handle next event in event queue  */
1056       
1057     case ResumeThread:  /* Move from the blocked queue to the tail of */
1058       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1059       event->tso->gran.blocktime += 
1060         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1061       do_the_startthread(event);
1062       goto next_thread;             /* handle next event in event queue  */
1063       
1064     case StartThread:
1065       do_the_startthread(event);
1066       goto next_thread;             /* handle next event in event queue  */
1067       
1068     case MoveThread:
1069       do_the_movethread(event);
1070       goto next_thread;             /* handle next event in event queue  */
1071       
1072     case MoveSpark:
1073       do_the_movespark(event);
1074       goto next_thread;             /* handle next event in event queue  */
1075       
1076     case FindWork:
1077       do_the_findwork(event);
1078       goto next_thread;             /* handle next event in event queue  */
1079       
1080     default:
1081       barf("Illegal event type %u\n", event->evttype);
1082     }  /* switch */
1083     
1084     /* This point was scheduler_loop in the old RTS */
1085
1086     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1087
1088     TimeOfLastEvent = CurrentTime[CurrentProc];
1089     TimeOfNextEvent = get_time_of_next_event();
1090     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1091     // CurrentTSO = ThreadQueueHd;
1092
1093     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1094                          TimeOfNextEvent));
1095
1096     if (RtsFlags.GranFlags.Light) 
1097       GranSimLight_leave_system(event, &ActiveTSO); 
1098
1099     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1100
1101     IF_DEBUG(gran, 
1102              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1103
1104     /* in a GranSim setup the TSO stays on the run queue */
1105     t = CurrentTSO;
1106     /* Take a thread from the run queue. */
1107     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1108
1109     IF_DEBUG(gran, 
1110              debugBelch("GRAN: About to run current thread, which is\n");
1111              G_TSO(t,5));
1112
1113     context_switch = 0; // turned on via GranYield, checking events and time slice
1114
1115     IF_DEBUG(gran, 
1116              DumpGranEvent(GR_SCHEDULE, t));
1117
1118     procStatus[CurrentProc] = Busy;
1119 }
1120 #endif // GRAN
1121
1122 /* ----------------------------------------------------------------------------
1123  * Send pending messages (PARALLEL_HASKELL only)
1124  * ------------------------------------------------------------------------- */
1125
1126 #if defined(PARALLEL_HASKELL)
1127 static StgTSO *
1128 scheduleSendPendingMessages(void)
1129 {
1130     StgSparkPool *pool;
1131     rtsSpark spark;
1132     StgTSO *t;
1133
1134 # if defined(PAR) // global Mem.Mgmt., omit for now
1135     if (PendingFetches != END_BF_QUEUE) {
1136         processFetches();
1137     }
1138 # endif
1139     
1140     if (RtsFlags.ParFlags.BufferTime) {
1141         // if we use message buffering, we must send away all message
1142         // packets which have become too old...
1143         sendOldBuffers(); 
1144     }
1145 }
1146 #endif
1147
1148 /* ----------------------------------------------------------------------------
1149  * Activate spark threads (PARALLEL_HASKELL only)
1150  * ------------------------------------------------------------------------- */
1151
1152 #if defined(PARALLEL_HASKELL)
1153 static void
1154 scheduleActivateSpark(void)
1155 {
1156 #if defined(SPARKS)
1157   ASSERT(emptyRunQueue());
1158 /* We get here if the run queue is empty and want some work.
1159    We try to turn a spark into a thread, and add it to the run queue,
1160    from where it will be picked up in the next iteration of the scheduler
1161    loop.
1162 */
1163
1164       /* :-[  no local threads => look out for local sparks */
1165       /* the spark pool for the current PE */
1166       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1167       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1168           pool->hd < pool->tl) {
1169         /* 
1170          * ToDo: add GC code check that we really have enough heap afterwards!!
1171          * Old comment:
1172          * If we're here (no runnable threads) and we have pending
1173          * sparks, we must have a space problem.  Get enough space
1174          * to turn one of those pending sparks into a
1175          * thread... 
1176          */
1177
1178         spark = findSpark(rtsFalse);            /* get a spark */
1179         if (spark != (rtsSpark) NULL) {
1180           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1181           IF_PAR_DEBUG(fish, // schedule,
1182                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1183                              tso->id, tso, advisory_thread_count));
1184
1185           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1186             IF_PAR_DEBUG(fish, // schedule,
1187                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1188                             spark));
1189             return rtsFalse; /* failed to generate a thread */
1190           }                  /* otherwise fall through & pick-up new tso */
1191         } else {
1192           IF_PAR_DEBUG(fish, // schedule,
1193                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1194                              spark_queue_len(pool)));
1195           return rtsFalse;  /* failed to generate a thread */
1196         }
1197         return rtsTrue;  /* success in generating a thread */
1198   } else { /* no more threads permitted or pool empty */
1199     return rtsFalse;  /* failed to generateThread */
1200   }
1201 #else
1202   tso = NULL; // avoid compiler warning only
1203   return rtsFalse;  /* dummy in non-PAR setup */
1204 #endif // SPARKS
1205 }
1206 #endif // PARALLEL_HASKELL
1207
1208 /* ----------------------------------------------------------------------------
1209  * Get work from a remote node (PARALLEL_HASKELL only)
1210  * ------------------------------------------------------------------------- */
1211     
1212 #if defined(PARALLEL_HASKELL)
1213 static rtsBool
1214 scheduleGetRemoteWork(rtsBool *receivedFinish)
1215 {
1216   ASSERT(emptyRunQueue());
1217
1218   if (RtsFlags.ParFlags.BufferTime) {
1219         IF_PAR_DEBUG(verbose, 
1220                 debugBelch("...send all pending data,"));
1221         {
1222           nat i;
1223           for (i=1; i<=nPEs; i++)
1224             sendImmediately(i); // send all messages away immediately
1225         }
1226   }
1227 # ifndef SPARKS
1228         //++EDEN++ idle() , i.e. send all buffers, wait for work
1229         // suppress fishing in EDEN... just look for incoming messages
1230         // (blocking receive)
1231   IF_PAR_DEBUG(verbose, 
1232                debugBelch("...wait for incoming messages...\n"));
1233   *receivedFinish = processMessages(); // blocking receive...
1234
1235         // and reenter scheduling loop after having received something
1236         // (return rtsFalse below)
1237
1238 # else /* activate SPARKS machinery */
1239 /* We get here, if we have no work, tried to activate a local spark, but still
1240    have no work. We try to get a remote spark, by sending a FISH message.
1241    Thread migration should be added here, and triggered when a sequence of 
1242    fishes returns without work. */
1243         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1244
1245       /* =8-[  no local sparks => look for work on other PEs */
1246         /*
1247          * We really have absolutely no work.  Send out a fish
1248          * (there may be some out there already), and wait for
1249          * something to arrive.  We clearly can't run any threads
1250          * until a SCHEDULE or RESUME arrives, and so that's what
1251          * we're hoping to see.  (Of course, we still have to
1252          * respond to other types of messages.)
1253          */
1254         rtsTime now = msTime() /*CURRENT_TIME*/;
1255         IF_PAR_DEBUG(verbose, 
1256                      debugBelch("--  now=%ld\n", now));
1257         IF_PAR_DEBUG(fish, // verbose,
1258              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1259                  (last_fish_arrived_at!=0 &&
1260                   last_fish_arrived_at+delay > now)) {
1261                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1262                      now, last_fish_arrived_at+delay, 
1263                      last_fish_arrived_at,
1264                      delay);
1265              });
1266   
1267         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1268             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1269           if (last_fish_arrived_at==0 ||
1270               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1271             /* outstandingFishes is set in sendFish, processFish;
1272                avoid flooding system with fishes via delay */
1273     next_fish_to_send_at = 0;  
1274   } else {
1275     /* ToDo: this should be done in the main scheduling loop to avoid the
1276              busy wait here; not so bad if fish delay is very small  */
1277     int iq = 0; // DEBUGGING -- HWL
1278     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1279     /* send a fish when ready, but process messages that arrive in the meantime */
1280     do {
1281       if (PacketsWaiting()) {
1282         iq++; // DEBUGGING
1283         *receivedFinish = processMessages();
1284       }
1285       now = msTime();
1286     } while (!*receivedFinish || now<next_fish_to_send_at);
1287     // JB: This means the fish could become obsolete, if we receive
1288     // work. Better check for work again? 
1289     // last line: while (!receivedFinish || !haveWork || now<...)
1290     // next line: if (receivedFinish || haveWork )
1291
1292     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1293       return rtsFalse;  // NB: this will leave scheduler loop
1294                         // immediately after return!
1295                           
1296     IF_PAR_DEBUG(fish, // verbose,
1297                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1298
1299   }
1300
1301     // JB: IMHO, this should all be hidden inside sendFish(...)
1302     /* pe = choosePE(); 
1303        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1304                 NEW_FISH_HUNGER);
1305
1306     // Global statistics: count no. of fishes
1307     if (RtsFlags.ParFlags.ParStats.Global &&
1308          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1309            globalParStats.tot_fish_mess++;
1310            }
1311     */ 
1312
1313   /* delayed fishes must have been sent by now! */
1314   next_fish_to_send_at = 0;  
1315   }
1316       
1317   *receivedFinish = processMessages();
1318 # endif /* SPARKS */
1319
1320  return rtsFalse;
1321  /* NB: this function always returns rtsFalse, meaning the scheduler
1322     loop continues with the next iteration; 
1323     rationale: 
1324       return code means success in finding work; we enter this function
1325       if there is no local work, thus have to send a fish which takes
1326       time until it arrives with work; in the meantime we should process
1327       messages in the main loop;
1328  */
1329 }
1330 #endif // PARALLEL_HASKELL
1331
1332 /* ----------------------------------------------------------------------------
1333  * PAR/GRAN: Report stats & debugging info(?)
1334  * ------------------------------------------------------------------------- */
1335
1336 #if defined(PAR) || defined(GRAN)
1337 static void
1338 scheduleGranParReport(void)
1339 {
1340   ASSERT(run_queue_hd != END_TSO_QUEUE);
1341
1342   /* Take a thread from the run queue, if we have work */
1343   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1344
1345     /* If this TSO has got its outport closed in the meantime, 
1346      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1347      * It has to be marked as TH_DEAD for this purpose.
1348      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1349
1350 JB: TODO: investigate wether state change field could be nuked
1351      entirely and replaced by the normal tso state (whatnext
1352      field). All we want to do is to kill tsos from outside.
1353      */
1354
1355     /* ToDo: write something to the log-file
1356     if (RTSflags.ParFlags.granSimStats && !sameThread)
1357         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1358
1359     CurrentTSO = t;
1360     */
1361     /* the spark pool for the current PE */
1362     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1363
1364     IF_DEBUG(scheduler, 
1365              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1366                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1367
1368     IF_PAR_DEBUG(fish,
1369              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1370                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1371
1372     if (RtsFlags.ParFlags.ParStats.Full && 
1373         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1374         (emitSchedule || // forced emit
1375          (t && LastTSO && t->id != LastTSO->id))) {
1376       /* 
1377          we are running a different TSO, so write a schedule event to log file
1378          NB: If we use fair scheduling we also have to write  a deschedule 
1379              event for LastTSO; with unfair scheduling we know that the
1380              previous tso has blocked whenever we switch to another tso, so
1381              we don't need it in GUM for now
1382       */
1383       IF_PAR_DEBUG(fish, // schedule,
1384                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1385
1386       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1387                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1388       emitSchedule = rtsFalse;
1389     }
1390 }     
1391 #endif
1392
1393 /* ----------------------------------------------------------------------------
1394  * After running a thread...
1395  * ------------------------------------------------------------------------- */
1396
1397 static void
1398 schedulePostRunThread(void)
1399 {
1400 #if defined(PAR)
1401     /* HACK 675: if the last thread didn't yield, make sure to print a 
1402        SCHEDULE event to the log file when StgRunning the next thread, even
1403        if it is the same one as before */
1404     LastTSO = t; 
1405     TimeOfLastYield = CURRENT_TIME;
1406 #endif
1407
1408   /* some statistics gathering in the parallel case */
1409
1410 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1411   switch (ret) {
1412     case HeapOverflow:
1413 # if defined(GRAN)
1414       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1415       globalGranStats.tot_heapover++;
1416 # elif defined(PAR)
1417       globalParStats.tot_heapover++;
1418 # endif
1419       break;
1420
1421      case StackOverflow:
1422 # if defined(GRAN)
1423       IF_DEBUG(gran, 
1424                DumpGranEvent(GR_DESCHEDULE, t));
1425       globalGranStats.tot_stackover++;
1426 # elif defined(PAR)
1427       // IF_DEBUG(par, 
1428       // DumpGranEvent(GR_DESCHEDULE, t);
1429       globalParStats.tot_stackover++;
1430 # endif
1431       break;
1432
1433     case ThreadYielding:
1434 # if defined(GRAN)
1435       IF_DEBUG(gran, 
1436                DumpGranEvent(GR_DESCHEDULE, t));
1437       globalGranStats.tot_yields++;
1438 # elif defined(PAR)
1439       // IF_DEBUG(par, 
1440       // DumpGranEvent(GR_DESCHEDULE, t);
1441       globalParStats.tot_yields++;
1442 # endif
1443       break; 
1444
1445     case ThreadBlocked:
1446 # if defined(GRAN)
1447       IF_DEBUG(scheduler,
1448                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1449                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1450                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1451                if (t->block_info.closure!=(StgClosure*)NULL)
1452                  print_bq(t->block_info.closure);
1453                debugBelch("\n"));
1454
1455       // ??? needed; should emit block before
1456       IF_DEBUG(gran, 
1457                DumpGranEvent(GR_DESCHEDULE, t)); 
1458       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1459       /*
1460         ngoq Dogh!
1461       ASSERT(procStatus[CurrentProc]==Busy || 
1462               ((procStatus[CurrentProc]==Fetching) && 
1463               (t->block_info.closure!=(StgClosure*)NULL)));
1464       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1465           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1466             procStatus[CurrentProc]==Fetching)) 
1467         procStatus[CurrentProc] = Idle;
1468       */
1469 # elif defined(PAR)
1470 //++PAR++  blockThread() writes the event (change?)
1471 # endif
1472     break;
1473
1474   case ThreadFinished:
1475     break;
1476
1477   default:
1478     barf("parGlobalStats: unknown return code");
1479     break;
1480     }
1481 #endif
1482 }
1483
1484 /* -----------------------------------------------------------------------------
1485  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1486  * -------------------------------------------------------------------------- */
1487
1488 static rtsBool
1489 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1490 {
1491     // did the task ask for a large block?
1492     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1493         // if so, get one and push it on the front of the nursery.
1494         bdescr *bd;
1495         lnat blocks;
1496         
1497         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1498         
1499         IF_DEBUG(scheduler,
1500                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1501                             (long)t->id, whatNext_strs[t->what_next], blocks));
1502         
1503         // don't do this if the nursery is (nearly) full, we'll GC first.
1504         if (cap->r.rCurrentNursery->link != NULL ||
1505             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1506                                                // if the nursery has only one block.
1507             
1508             ACQUIRE_SM_LOCK
1509             bd = allocGroup( blocks );
1510             RELEASE_SM_LOCK
1511             cap->r.rNursery->n_blocks += blocks;
1512             
1513             // link the new group into the list
1514             bd->link = cap->r.rCurrentNursery;
1515             bd->u.back = cap->r.rCurrentNursery->u.back;
1516             if (cap->r.rCurrentNursery->u.back != NULL) {
1517                 cap->r.rCurrentNursery->u.back->link = bd;
1518             } else {
1519 #if !defined(THREADED_RTS)
1520                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1521                        g0s0 == cap->r.rNursery);
1522 #endif
1523                 cap->r.rNursery->blocks = bd;
1524             }             
1525             cap->r.rCurrentNursery->u.back = bd;
1526             
1527             // initialise it as a nursery block.  We initialise the
1528             // step, gen_no, and flags field of *every* sub-block in
1529             // this large block, because this is easier than making
1530             // sure that we always find the block head of a large
1531             // block whenever we call Bdescr() (eg. evacuate() and
1532             // isAlive() in the GC would both have to do this, at
1533             // least).
1534             { 
1535                 bdescr *x;
1536                 for (x = bd; x < bd + blocks; x++) {
1537                     x->step = cap->r.rNursery;
1538                     x->gen_no = 0;
1539                     x->flags = 0;
1540                 }
1541             }
1542             
1543             // This assert can be a killer if the app is doing lots
1544             // of large block allocations.
1545             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1546             
1547             // now update the nursery to point to the new block
1548             cap->r.rCurrentNursery = bd;
1549             
1550             // we might be unlucky and have another thread get on the
1551             // run queue before us and steal the large block, but in that
1552             // case the thread will just end up requesting another large
1553             // block.
1554             pushOnRunQueue(cap,t);
1555             return rtsFalse;  /* not actually GC'ing */
1556         }
1557     }
1558     
1559     IF_DEBUG(scheduler,
1560              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1561                         (long)t->id, whatNext_strs[t->what_next]));
1562 #if defined(GRAN)
1563     ASSERT(!is_on_queue(t,CurrentProc));
1564 #elif defined(PARALLEL_HASKELL)
1565     /* Currently we emit a DESCHEDULE event before GC in GUM.
1566        ToDo: either add separate event to distinguish SYSTEM time from rest
1567        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1568     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1569         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1570                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1571         emitSchedule = rtsTrue;
1572     }
1573 #endif
1574       
1575     pushOnRunQueue(cap,t);
1576     return rtsTrue;
1577     /* actual GC is done at the end of the while loop in schedule() */
1578 }
1579
1580 /* -----------------------------------------------------------------------------
1581  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1582  * -------------------------------------------------------------------------- */
1583
1584 static void
1585 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1586 {
1587     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1588                                   (long)t->id, whatNext_strs[t->what_next]));
1589     /* just adjust the stack for this thread, then pop it back
1590      * on the run queue.
1591      */
1592     { 
1593         /* enlarge the stack */
1594         StgTSO *new_t = threadStackOverflow(cap, t);
1595         
1596         /* The TSO attached to this Task may have moved, so update the
1597          * pointer to it.
1598          */
1599         if (task->tso == t) {
1600             task->tso = new_t;
1601         }
1602         pushOnRunQueue(cap,new_t);
1603     }
1604 }
1605
1606 /* -----------------------------------------------------------------------------
1607  * Handle a thread that returned to the scheduler with ThreadYielding
1608  * -------------------------------------------------------------------------- */
1609
1610 static rtsBool
1611 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1612 {
1613     // Reset the context switch flag.  We don't do this just before
1614     // running the thread, because that would mean we would lose ticks
1615     // during GC, which can lead to unfair scheduling (a thread hogs
1616     // the CPU because the tick always arrives during GC).  This way
1617     // penalises threads that do a lot of allocation, but that seems
1618     // better than the alternative.
1619     context_switch = 0;
1620     
1621     /* put the thread back on the run queue.  Then, if we're ready to
1622      * GC, check whether this is the last task to stop.  If so, wake
1623      * up the GC thread.  getThread will block during a GC until the
1624      * GC is finished.
1625      */
1626     IF_DEBUG(scheduler,
1627              if (t->what_next != prev_what_next) {
1628                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1629                             (long)t->id, whatNext_strs[t->what_next]);
1630              } else {
1631                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1632                             (long)t->id, whatNext_strs[t->what_next]);
1633              }
1634         );
1635     
1636     IF_DEBUG(sanity,
1637              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1638              checkTSO(t));
1639     ASSERT(t->link == END_TSO_QUEUE);
1640     
1641     // Shortcut if we're just switching evaluators: don't bother
1642     // doing stack squeezing (which can be expensive), just run the
1643     // thread.
1644     if (t->what_next != prev_what_next) {
1645         return rtsTrue;
1646     }
1647     
1648 #if defined(GRAN)
1649     ASSERT(!is_on_queue(t,CurrentProc));
1650       
1651     IF_DEBUG(sanity,
1652              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1653              checkThreadQsSanity(rtsTrue));
1654
1655 #endif
1656
1657     addToRunQueue(cap,t);
1658
1659 #if defined(GRAN)
1660     /* add a ContinueThread event to actually process the thread */
1661     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1662               ContinueThread,
1663               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1664     IF_GRAN_DEBUG(bq, 
1665                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1666                   G_EVENTQ(0);
1667                   G_CURR_THREADQ(0));
1668 #endif
1669     return rtsFalse;
1670 }
1671
1672 /* -----------------------------------------------------------------------------
1673  * Handle a thread that returned to the scheduler with ThreadBlocked
1674  * -------------------------------------------------------------------------- */
1675
1676 static void
1677 scheduleHandleThreadBlocked( StgTSO *t
1678 #if !defined(GRAN) && !defined(DEBUG)
1679     STG_UNUSED
1680 #endif
1681     )
1682 {
1683 #if defined(GRAN)
1684     IF_DEBUG(scheduler,
1685              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1686                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1687              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1688     
1689     // ??? needed; should emit block before
1690     IF_DEBUG(gran, 
1691              DumpGranEvent(GR_DESCHEDULE, t)); 
1692     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1693     /*
1694       ngoq Dogh!
1695       ASSERT(procStatus[CurrentProc]==Busy || 
1696       ((procStatus[CurrentProc]==Fetching) && 
1697       (t->block_info.closure!=(StgClosure*)NULL)));
1698       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1699       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1700       procStatus[CurrentProc]==Fetching)) 
1701       procStatus[CurrentProc] = Idle;
1702     */
1703 #elif defined(PAR)
1704     IF_DEBUG(scheduler,
1705              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1706                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1707     IF_PAR_DEBUG(bq,
1708                  
1709                  if (t->block_info.closure!=(StgClosure*)NULL) 
1710                  print_bq(t->block_info.closure));
1711     
1712     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1713     blockThread(t);
1714     
1715     /* whatever we schedule next, we must log that schedule */
1716     emitSchedule = rtsTrue;
1717     
1718 #else /* !GRAN */
1719
1720       // We don't need to do anything.  The thread is blocked, and it
1721       // has tidied up its stack and placed itself on whatever queue
1722       // it needs to be on.
1723
1724 #if !defined(THREADED_RTS)
1725     ASSERT(t->why_blocked != NotBlocked);
1726              // This might not be true under THREADED_RTS: we don't have
1727              // exclusive access to this TSO, so someone might have
1728              // woken it up by now.  This actually happens: try
1729              // conc023 +RTS -N2.
1730 #endif
1731
1732     IF_DEBUG(scheduler,
1733              debugBelch("--<< thread %d (%s) stopped: ", 
1734                         t->id, whatNext_strs[t->what_next]);
1735              printThreadBlockage(t);
1736              debugBelch("\n"));
1737     
1738     /* Only for dumping event to log file 
1739        ToDo: do I need this in GranSim, too?
1740        blockThread(t);
1741     */
1742 #endif
1743 }
1744
1745 /* -----------------------------------------------------------------------------
1746  * Handle a thread that returned to the scheduler with ThreadFinished
1747  * -------------------------------------------------------------------------- */
1748
1749 static rtsBool
1750 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1751 {
1752     /* Need to check whether this was a main thread, and if so,
1753      * return with the return value.
1754      *
1755      * We also end up here if the thread kills itself with an
1756      * uncaught exception, see Exception.cmm.
1757      */
1758     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1759                                   t->id, whatNext_strs[t->what_next]));
1760
1761 #if defined(GRAN)
1762       endThread(t, CurrentProc); // clean-up the thread
1763 #elif defined(PARALLEL_HASKELL)
1764       /* For now all are advisory -- HWL */
1765       //if(t->priority==AdvisoryPriority) ??
1766       advisory_thread_count--; // JB: Caution with this counter, buggy!
1767       
1768 # if defined(DIST)
1769       if(t->dist.priority==RevalPriority)
1770         FinishReval(t);
1771 # endif
1772     
1773 # if defined(EDENOLD)
1774       // the thread could still have an outport... (BUG)
1775       if (t->eden.outport != -1) {
1776       // delete the outport for the tso which has finished...
1777         IF_PAR_DEBUG(eden_ports,
1778                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1779                               t->eden.outport, t->id));
1780         deleteOPT(t);
1781       }
1782       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1783       if (t->eden.epid != -1) {
1784         IF_PAR_DEBUG(eden_ports,
1785                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1786                            t->id, t->eden.epid));
1787         removeTSOfromProcess(t);
1788       }
1789 # endif 
1790
1791 # if defined(PAR)
1792       if (RtsFlags.ParFlags.ParStats.Full &&
1793           !RtsFlags.ParFlags.ParStats.Suppressed) 
1794         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1795
1796       //  t->par only contains statistics: left out for now...
1797       IF_PAR_DEBUG(fish,
1798                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1799                               t->id,t,t->par.sparkname));
1800 # endif
1801 #endif // PARALLEL_HASKELL
1802
1803       //
1804       // Check whether the thread that just completed was a bound
1805       // thread, and if so return with the result.  
1806       //
1807       // There is an assumption here that all thread completion goes
1808       // through this point; we need to make sure that if a thread
1809       // ends up in the ThreadKilled state, that it stays on the run
1810       // queue so it can be dealt with here.
1811       //
1812
1813       if (t->bound) {
1814
1815           if (t->bound != task) {
1816 #if !defined(THREADED_RTS)
1817               // Must be a bound thread that is not the topmost one.  Leave
1818               // it on the run queue until the stack has unwound to the
1819               // point where we can deal with this.  Leaving it on the run
1820               // queue also ensures that the garbage collector knows about
1821               // this thread and its return value (it gets dropped from the
1822               // all_threads list so there's no other way to find it).
1823               appendToRunQueue(cap,t);
1824               return rtsFalse;
1825 #else
1826               // this cannot happen in the threaded RTS, because a
1827               // bound thread can only be run by the appropriate Task.
1828               barf("finished bound thread that isn't mine");
1829 #endif
1830           }
1831
1832           ASSERT(task->tso == t);
1833
1834           if (t->what_next == ThreadComplete) {
1835               if (task->ret) {
1836                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1837                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1838               }
1839               task->stat = Success;
1840           } else {
1841               if (task->ret) {
1842                   *(task->ret) = NULL;
1843               }
1844               if (interrupted) {
1845                   task->stat = Interrupted;
1846               } else {
1847                   task->stat = Killed;
1848               }
1849           }
1850 #ifdef DEBUG
1851           removeThreadLabel((StgWord)task->tso->id);
1852 #endif
1853           return rtsTrue; // tells schedule() to return
1854       }
1855
1856       return rtsFalse;
1857 }
1858
1859 /* -----------------------------------------------------------------------------
1860  * Perform a heap census, if PROFILING
1861  * -------------------------------------------------------------------------- */
1862
1863 static rtsBool
1864 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 {
1866 #if defined(PROFILING)
1867     // When we have +RTS -i0 and we're heap profiling, do a census at
1868     // every GC.  This lets us get repeatable runs for debugging.
1869     if (performHeapProfile ||
1870         (RtsFlags.ProfFlags.profileInterval==0 &&
1871          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1872         GarbageCollect(GetRoots, rtsTrue);
1873         heapCensus();
1874         performHeapProfile = rtsFalse;
1875         return rtsTrue;  // true <=> we already GC'd
1876     }
1877 #endif
1878     return rtsFalse;
1879 }
1880
1881 /* -----------------------------------------------------------------------------
1882  * Perform a garbage collection if necessary
1883  * -------------------------------------------------------------------------- */
1884
1885 static void
1886 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1887               rtsBool force_major, void (*get_roots)(evac_fn))
1888 {
1889     StgTSO *t;
1890 #ifdef THREADED_RTS
1891     static volatile StgWord waiting_for_gc;
1892     rtsBool was_waiting;
1893     nat i;
1894 #endif
1895
1896 #ifdef THREADED_RTS
1897     // In order to GC, there must be no threads running Haskell code.
1898     // Therefore, the GC thread needs to hold *all* the capabilities,
1899     // and release them after the GC has completed.  
1900     //
1901     // This seems to be the simplest way: previous attempts involved
1902     // making all the threads with capabilities give up their
1903     // capabilities and sleep except for the *last* one, which
1904     // actually did the GC.  But it's quite hard to arrange for all
1905     // the other tasks to sleep and stay asleep.
1906     //
1907         
1908     was_waiting = cas(&waiting_for_gc, 0, 1);
1909     if (was_waiting) {
1910         do {
1911             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1912             if (cap) yieldCapability(&cap,task);
1913         } while (waiting_for_gc);
1914         return;  // NOTE: task->cap might have changed here
1915     }
1916
1917     for (i=0; i < n_capabilities; i++) {
1918         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1919         if (cap != &capabilities[i]) {
1920             Capability *pcap = &capabilities[i];
1921             // we better hope this task doesn't get migrated to
1922             // another Capability while we're waiting for this one.
1923             // It won't, because load balancing happens while we have
1924             // all the Capabilities, but even so it's a slightly
1925             // unsavoury invariant.
1926             task->cap = pcap;
1927             context_switch = 1;
1928             waitForReturnCapability(&pcap, task);
1929             if (pcap != &capabilities[i]) {
1930                 barf("scheduleDoGC: got the wrong capability");
1931             }
1932         }
1933     }
1934
1935     waiting_for_gc = rtsFalse;
1936 #endif
1937
1938     /* Kick any transactions which are invalid back to their
1939      * atomically frames.  When next scheduled they will try to
1940      * commit, this commit will fail and they will retry.
1941      */
1942     { 
1943         StgTSO *next;
1944
1945         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1946             if (t->what_next == ThreadRelocated) {
1947                 next = t->link;
1948             } else {
1949                 next = t->global_link;
1950                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1951                     if (!stmValidateNestOfTransactions (t -> trec)) {
1952                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1953                         
1954                         // strip the stack back to the
1955                         // ATOMICALLY_FRAME, aborting the (nested)
1956                         // transaction, and saving the stack of any
1957                         // partially-evaluated thunks on the heap.
1958                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
1959                         
1960 #ifdef REG_R1
1961                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1962 #endif
1963                     }
1964                 }
1965             }
1966         }
1967     }
1968     
1969     // so this happens periodically:
1970     if (cap) scheduleCheckBlackHoles(cap);
1971     
1972     IF_DEBUG(scheduler, printAllThreads());
1973
1974     /* everybody back, start the GC.
1975      * Could do it in this thread, or signal a condition var
1976      * to do it in another thread.  Either way, we need to
1977      * broadcast on gc_pending_cond afterward.
1978      */
1979 #if defined(THREADED_RTS)
1980     IF_DEBUG(scheduler,sched_belch("doing GC"));
1981 #endif
1982     GarbageCollect(get_roots, force_major);
1983     
1984 #if defined(THREADED_RTS)
1985     // release our stash of capabilities.
1986     for (i = 0; i < n_capabilities; i++) {
1987         if (cap != &capabilities[i]) {
1988             task->cap = &capabilities[i];
1989             releaseCapability(&capabilities[i]);
1990         }
1991     }
1992     if (cap) {
1993         task->cap = cap;
1994     } else {
1995         task->cap = NULL;
1996     }
1997 #endif
1998
1999 #if defined(GRAN)
2000     /* add a ContinueThread event to continue execution of current thread */
2001     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2002               ContinueThread,
2003               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2004     IF_GRAN_DEBUG(bq, 
2005                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2006                   G_EVENTQ(0);
2007                   G_CURR_THREADQ(0));
2008 #endif /* GRAN */
2009 }
2010
2011 /* ---------------------------------------------------------------------------
2012  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2013  * used by Control.Concurrent for error checking.
2014  * ------------------------------------------------------------------------- */
2015  
2016 StgBool
2017 rtsSupportsBoundThreads(void)
2018 {
2019 #if defined(THREADED_RTS)
2020   return rtsTrue;
2021 #else
2022   return rtsFalse;
2023 #endif
2024 }
2025
2026 /* ---------------------------------------------------------------------------
2027  * isThreadBound(tso): check whether tso is bound to an OS thread.
2028  * ------------------------------------------------------------------------- */
2029  
2030 StgBool
2031 isThreadBound(StgTSO* tso USED_IF_THREADS)
2032 {
2033 #if defined(THREADED_RTS)
2034   return (tso->bound != NULL);
2035 #endif
2036   return rtsFalse;
2037 }
2038
2039 /* ---------------------------------------------------------------------------
2040  * Singleton fork(). Do not copy any running threads.
2041  * ------------------------------------------------------------------------- */
2042
2043 #if !defined(mingw32_HOST_OS)
2044 #define FORKPROCESS_PRIMOP_SUPPORTED
2045 #endif
2046
2047 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2048 static void 
2049 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2050 #endif
2051 StgInt
2052 forkProcess(HsStablePtr *entry
2053 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2054             STG_UNUSED
2055 #endif
2056            )
2057 {
2058 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2059     Task *task;
2060     pid_t pid;
2061     StgTSO* t,*next;
2062     Capability *cap;
2063     
2064 #if defined(THREADED_RTS)
2065     if (RtsFlags.ParFlags.nNodes > 1) {
2066         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2067         stg_exit(EXIT_FAILURE);
2068     }
2069 #endif
2070
2071     IF_DEBUG(scheduler,sched_belch("forking!"));
2072     
2073     // ToDo: for SMP, we should probably acquire *all* the capabilities
2074     cap = rts_lock();
2075     
2076     pid = fork();
2077     
2078     if (pid) { // parent
2079         
2080         // just return the pid
2081         rts_unlock(cap);
2082         return pid;
2083         
2084     } else { // child
2085         
2086         // delete all threads
2087         cap->run_queue_hd = END_TSO_QUEUE;
2088         cap->run_queue_tl = END_TSO_QUEUE;
2089         
2090         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2091             next = t->link;
2092             
2093             // don't allow threads to catch the ThreadKilled exception
2094             deleteThreadImmediately(cap,t);
2095         }
2096         
2097         // wipe the task list
2098         ACQUIRE_LOCK(&sched_mutex);
2099         for (task = all_tasks; task != NULL; task=task->all_link) {
2100             if (task != cap->running_task) discardTask(task);
2101         }
2102         RELEASE_LOCK(&sched_mutex);
2103
2104         cap->suspended_ccalling_tasks = NULL;
2105
2106 #if defined(THREADED_RTS)
2107         // wipe our spare workers list.
2108         cap->spare_workers = NULL;
2109         cap->returning_tasks_hd = NULL;
2110         cap->returning_tasks_tl = NULL;
2111 #endif
2112
2113         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2114         rts_checkSchedStatus("forkProcess",cap);
2115         
2116         rts_unlock(cap);
2117         hs_exit();                      // clean up and exit
2118         stg_exit(EXIT_SUCCESS);
2119     }
2120 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2121     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2122     return -1;
2123 #endif
2124 }
2125
2126 /* ---------------------------------------------------------------------------
2127  * Delete the threads on the run queue of the current capability.
2128  * ------------------------------------------------------------------------- */
2129    
2130 static void
2131 deleteRunQueue (Capability *cap)
2132 {
2133     StgTSO *t, *next;
2134     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2135         ASSERT(t->what_next != ThreadRelocated);
2136         next = t->link;
2137         deleteThread(cap, t);
2138     }
2139 }
2140
2141 /* startThread and  insertThread are now in GranSim.c -- HWL */
2142
2143
2144 /* -----------------------------------------------------------------------------
2145    Managing the suspended_ccalling_tasks list.
2146    Locks required: sched_mutex
2147    -------------------------------------------------------------------------- */
2148
2149 STATIC_INLINE void
2150 suspendTask (Capability *cap, Task *task)
2151 {
2152     ASSERT(task->next == NULL && task->prev == NULL);
2153     task->next = cap->suspended_ccalling_tasks;
2154     task->prev = NULL;
2155     if (cap->suspended_ccalling_tasks) {
2156         cap->suspended_ccalling_tasks->prev = task;
2157     }
2158     cap->suspended_ccalling_tasks = task;
2159 }
2160
2161 STATIC_INLINE void
2162 recoverSuspendedTask (Capability *cap, Task *task)
2163 {
2164     if (task->prev) {
2165         task->prev->next = task->next;
2166     } else {
2167         ASSERT(cap->suspended_ccalling_tasks == task);
2168         cap->suspended_ccalling_tasks = task->next;
2169     }
2170     if (task->next) {
2171         task->next->prev = task->prev;
2172     }
2173     task->next = task->prev = NULL;
2174 }
2175
2176 /* ---------------------------------------------------------------------------
2177  * Suspending & resuming Haskell threads.
2178  * 
2179  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2180  * its capability before calling the C function.  This allows another
2181  * task to pick up the capability and carry on running Haskell
2182  * threads.  It also means that if the C call blocks, it won't lock
2183  * the whole system.
2184  *
2185  * The Haskell thread making the C call is put to sleep for the
2186  * duration of the call, on the susepended_ccalling_threads queue.  We
2187  * give out a token to the task, which it can use to resume the thread
2188  * on return from the C function.
2189  * ------------------------------------------------------------------------- */
2190    
2191 void *
2192 suspendThread (StgRegTable *reg)
2193 {
2194   Capability *cap;
2195   int saved_errno = errno;
2196   StgTSO *tso;
2197   Task *task;
2198
2199   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2200    */
2201   cap = regTableToCapability(reg);
2202
2203   task = cap->running_task;
2204   tso = cap->r.rCurrentTSO;
2205
2206   IF_DEBUG(scheduler,
2207            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2208
2209   // XXX this might not be necessary --SDM
2210   tso->what_next = ThreadRunGHC;
2211
2212   threadPaused(cap,tso);
2213
2214   if(tso->blocked_exceptions == NULL)  {
2215       tso->why_blocked = BlockedOnCCall;
2216       tso->blocked_exceptions = END_TSO_QUEUE;
2217   } else {
2218       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2219   }
2220
2221   // Hand back capability
2222   task->suspended_tso = tso;
2223
2224   ACQUIRE_LOCK(&cap->lock);
2225
2226   suspendTask(cap,task);
2227   cap->in_haskell = rtsFalse;
2228   releaseCapability_(cap);
2229   
2230   RELEASE_LOCK(&cap->lock);
2231
2232 #if defined(THREADED_RTS)
2233   /* Preparing to leave the RTS, so ensure there's a native thread/task
2234      waiting to take over.
2235   */
2236   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2237 #endif
2238
2239   errno = saved_errno;
2240   return task;
2241 }
2242
2243 StgRegTable *
2244 resumeThread (void *task_)
2245 {
2246     StgTSO *tso;
2247     Capability *cap;
2248     int saved_errno = errno;
2249     Task *task = task_;
2250
2251     cap = task->cap;
2252     // Wait for permission to re-enter the RTS with the result.
2253     waitForReturnCapability(&cap,task);
2254     // we might be on a different capability now... but if so, our
2255     // entry on the suspended_ccalling_tasks list will also have been
2256     // migrated.
2257
2258     // Remove the thread from the suspended list
2259     recoverSuspendedTask(cap,task);
2260
2261     tso = task->suspended_tso;
2262     task->suspended_tso = NULL;
2263     tso->link = END_TSO_QUEUE;
2264     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2265     
2266     if (tso->why_blocked == BlockedOnCCall) {
2267         awakenBlockedQueue(cap,tso->blocked_exceptions);
2268         tso->blocked_exceptions = NULL;
2269     }
2270     
2271     /* Reset blocking status */
2272     tso->why_blocked  = NotBlocked;
2273     
2274     cap->r.rCurrentTSO = tso;
2275     cap->in_haskell = rtsTrue;
2276     errno = saved_errno;
2277
2278     /* We might have GC'd, mark the TSO dirty again */
2279     dirtyTSO(tso);
2280
2281     return &cap->r;
2282 }
2283
2284 /* ---------------------------------------------------------------------------
2285  * Comparing Thread ids.
2286  *
2287  * This is used from STG land in the implementation of the
2288  * instances of Eq/Ord for ThreadIds.
2289  * ------------------------------------------------------------------------ */
2290
2291 int
2292 cmp_thread(StgPtr tso1, StgPtr tso2) 
2293
2294   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2295   StgThreadID id2 = ((StgTSO *)tso2)->id;
2296  
2297   if (id1 < id2) return (-1);
2298   if (id1 > id2) return 1;
2299   return 0;
2300 }
2301
2302 /* ---------------------------------------------------------------------------
2303  * Fetching the ThreadID from an StgTSO.
2304  *
2305  * This is used in the implementation of Show for ThreadIds.
2306  * ------------------------------------------------------------------------ */
2307 int
2308 rts_getThreadId(StgPtr tso) 
2309 {
2310   return ((StgTSO *)tso)->id;
2311 }
2312
2313 #ifdef DEBUG
2314 void
2315 labelThread(StgPtr tso, char *label)
2316 {
2317   int len;
2318   void *buf;
2319
2320   /* Caveat: Once set, you can only set the thread name to "" */
2321   len = strlen(label)+1;
2322   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2323   strncpy(buf,label,len);
2324   /* Update will free the old memory for us */
2325   updateThreadLabel(((StgTSO *)tso)->id,buf);
2326 }
2327 #endif /* DEBUG */
2328
2329 /* ---------------------------------------------------------------------------
2330    Create a new thread.
2331
2332    The new thread starts with the given stack size.  Before the
2333    scheduler can run, however, this thread needs to have a closure
2334    (and possibly some arguments) pushed on its stack.  See
2335    pushClosure() in Schedule.h.
2336
2337    createGenThread() and createIOThread() (in SchedAPI.h) are
2338    convenient packaged versions of this function.
2339
2340    currently pri (priority) is only used in a GRAN setup -- HWL
2341    ------------------------------------------------------------------------ */
2342 #if defined(GRAN)
2343 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2344 StgTSO *
2345 createThread(nat size, StgInt pri)
2346 #else
2347 StgTSO *
2348 createThread(Capability *cap, nat size)
2349 #endif
2350 {
2351     StgTSO *tso;
2352     nat stack_size;
2353
2354     /* sched_mutex is *not* required */
2355
2356     /* First check whether we should create a thread at all */
2357 #if defined(PARALLEL_HASKELL)
2358     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2359     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2360         threadsIgnored++;
2361         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2362                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2363         return END_TSO_QUEUE;
2364     }
2365     threadsCreated++;
2366 #endif
2367
2368 #if defined(GRAN)
2369     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2370 #endif
2371
2372     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2373
2374     /* catch ridiculously small stack sizes */
2375     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2376         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2377     }
2378
2379     stack_size = size - TSO_STRUCT_SIZEW;
2380     
2381     tso = (StgTSO *)allocateLocal(cap, size);
2382     TICK_ALLOC_TSO(stack_size, 0);
2383
2384     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2385 #if defined(GRAN)
2386     SET_GRAN_HDR(tso, ThisPE);
2387 #endif
2388
2389     // Always start with the compiled code evaluator
2390     tso->what_next = ThreadRunGHC;
2391
2392     tso->why_blocked  = NotBlocked;
2393     tso->blocked_exceptions = NULL;
2394     tso->flags = TSO_DIRTY;
2395     
2396     tso->saved_errno = 0;
2397     tso->bound = NULL;
2398     
2399     tso->stack_size     = stack_size;
2400     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2401                           - TSO_STRUCT_SIZEW;
2402     tso->sp             = (P_)&(tso->stack) + stack_size;
2403
2404     tso->trec = NO_TREC;
2405     
2406 #ifdef PROFILING
2407     tso->prof.CCCS = CCS_MAIN;
2408 #endif
2409     
2410   /* put a stop frame on the stack */
2411     tso->sp -= sizeofW(StgStopFrame);
2412     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2413     tso->link = END_TSO_QUEUE;
2414     
2415   // ToDo: check this
2416 #if defined(GRAN)
2417     /* uses more flexible routine in GranSim */
2418     insertThread(tso, CurrentProc);
2419 #else
2420     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2421      * from its creation
2422      */
2423 #endif
2424     
2425 #if defined(GRAN) 
2426     if (RtsFlags.GranFlags.GranSimStats.Full) 
2427         DumpGranEvent(GR_START,tso);
2428 #elif defined(PARALLEL_HASKELL)
2429     if (RtsFlags.ParFlags.ParStats.Full) 
2430         DumpGranEvent(GR_STARTQ,tso);
2431     /* HACk to avoid SCHEDULE 
2432        LastTSO = tso; */
2433 #endif
2434     
2435     /* Link the new thread on the global thread list.
2436      */
2437     ACQUIRE_LOCK(&sched_mutex);
2438     tso->id = next_thread_id++;  // while we have the mutex
2439     tso->global_link = all_threads;
2440     all_threads = tso;
2441     RELEASE_LOCK(&sched_mutex);
2442     
2443 #if defined(DIST)
2444     tso->dist.priority = MandatoryPriority; //by default that is...
2445 #endif
2446     
2447 #if defined(GRAN)
2448     tso->gran.pri = pri;
2449 # if defined(DEBUG)
2450     tso->gran.magic = TSO_MAGIC; // debugging only
2451 # endif
2452     tso->gran.sparkname   = 0;
2453     tso->gran.startedat   = CURRENT_TIME; 
2454     tso->gran.exported    = 0;
2455     tso->gran.basicblocks = 0;
2456     tso->gran.allocs      = 0;
2457     tso->gran.exectime    = 0;
2458     tso->gran.fetchtime   = 0;
2459     tso->gran.fetchcount  = 0;
2460     tso->gran.blocktime   = 0;
2461     tso->gran.blockcount  = 0;
2462     tso->gran.blockedat   = 0;
2463     tso->gran.globalsparks = 0;
2464     tso->gran.localsparks  = 0;
2465     if (RtsFlags.GranFlags.Light)
2466         tso->gran.clock  = Now; /* local clock */
2467     else
2468         tso->gran.clock  = 0;
2469     
2470     IF_DEBUG(gran,printTSO(tso));
2471 #elif defined(PARALLEL_HASKELL)
2472 # if defined(DEBUG)
2473     tso->par.magic = TSO_MAGIC; // debugging only
2474 # endif
2475     tso->par.sparkname   = 0;
2476     tso->par.startedat   = CURRENT_TIME; 
2477     tso->par.exported    = 0;
2478     tso->par.basicblocks = 0;
2479     tso->par.allocs      = 0;
2480     tso->par.exectime    = 0;
2481     tso->par.fetchtime   = 0;
2482     tso->par.fetchcount  = 0;
2483     tso->par.blocktime   = 0;
2484     tso->par.blockcount  = 0;
2485     tso->par.blockedat   = 0;
2486     tso->par.globalsparks = 0;
2487     tso->par.localsparks  = 0;
2488 #endif
2489     
2490 #if defined(GRAN)
2491     globalGranStats.tot_threads_created++;
2492     globalGranStats.threads_created_on_PE[CurrentProc]++;
2493     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2494     globalGranStats.tot_sq_probes++;
2495 #elif defined(PARALLEL_HASKELL)
2496     // collect parallel global statistics (currently done together with GC stats)
2497     if (RtsFlags.ParFlags.ParStats.Global &&
2498         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2499         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2500         globalParStats.tot_threads_created++;
2501     }
2502 #endif 
2503     
2504 #if defined(GRAN)
2505     IF_GRAN_DEBUG(pri,
2506                   sched_belch("==__ schedule: Created TSO %d (%p);",
2507                               CurrentProc, tso, tso->id));
2508 #elif defined(PARALLEL_HASKELL)
2509     IF_PAR_DEBUG(verbose,
2510                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2511                              (long)tso->id, tso, advisory_thread_count));
2512 #else
2513     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2514                                    (long)tso->id, (long)tso->stack_size));
2515 #endif    
2516     return tso;
2517 }
2518
2519 #if defined(PAR)
2520 /* RFP:
2521    all parallel thread creation calls should fall through the following routine.
2522 */
2523 StgTSO *
2524 createThreadFromSpark(rtsSpark spark) 
2525 { StgTSO *tso;
2526   ASSERT(spark != (rtsSpark)NULL);
2527 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2528   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2529   { threadsIgnored++;
2530     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2531           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2532     return END_TSO_QUEUE;
2533   }
2534   else
2535   { threadsCreated++;
2536     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2537     if (tso==END_TSO_QUEUE)     
2538       barf("createSparkThread: Cannot create TSO");
2539 #if defined(DIST)
2540     tso->priority = AdvisoryPriority;
2541 #endif
2542     pushClosure(tso,spark);
2543     addToRunQueue(tso);
2544     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2545   }
2546   return tso;
2547 }
2548 #endif
2549
2550 /*
2551   Turn a spark into a thread.
2552   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2553 */
2554 #if 0
2555 StgTSO *
2556 activateSpark (rtsSpark spark) 
2557 {
2558   StgTSO *tso;
2559
2560   tso = createSparkThread(spark);
2561   if (RtsFlags.ParFlags.ParStats.Full) {   
2562     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2563       IF_PAR_DEBUG(verbose,
2564                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2565                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2566   }
2567   // ToDo: fwd info on local/global spark to thread -- HWL
2568   // tso->gran.exported =  spark->exported;
2569   // tso->gran.locked =   !spark->global;
2570   // tso->gran.sparkname = spark->name;
2571
2572   return tso;
2573 }
2574 #endif
2575
2576 /* ---------------------------------------------------------------------------
2577  * scheduleThread()
2578  *
2579  * scheduleThread puts a thread on the end  of the runnable queue.
2580  * This will usually be done immediately after a thread is created.
2581  * The caller of scheduleThread must create the thread using e.g.
2582  * createThread and push an appropriate closure
2583  * on this thread's stack before the scheduler is invoked.
2584  * ------------------------------------------------------------------------ */
2585
2586 void
2587 scheduleThread(Capability *cap, StgTSO *tso)
2588 {
2589     // The thread goes at the *end* of the run-queue, to avoid possible
2590     // starvation of any threads already on the queue.
2591     appendToRunQueue(cap,tso);
2592 }
2593
2594 Capability *
2595 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2596 {
2597     Task *task;
2598
2599     // We already created/initialised the Task
2600     task = cap->running_task;
2601
2602     // This TSO is now a bound thread; make the Task and TSO
2603     // point to each other.
2604     tso->bound = task;
2605
2606     task->tso = tso;
2607     task->ret = ret;
2608     task->stat = NoStatus;
2609
2610     appendToRunQueue(cap,tso);
2611
2612     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2613
2614 #if defined(GRAN)
2615     /* GranSim specific init */
2616     CurrentTSO = m->tso;                // the TSO to run
2617     procStatus[MainProc] = Busy;        // status of main PE
2618     CurrentProc = MainProc;             // PE to run it on
2619 #endif
2620
2621     cap = schedule(cap,task);
2622
2623     ASSERT(task->stat != NoStatus);
2624     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2625
2626     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2627     return cap;
2628 }
2629
2630 /* ----------------------------------------------------------------------------
2631  * Starting Tasks
2632  * ------------------------------------------------------------------------- */
2633
2634 #if defined(THREADED_RTS)
2635 void
2636 workerStart(Task *task)
2637 {
2638     Capability *cap;
2639
2640     // See startWorkerTask().
2641     ACQUIRE_LOCK(&task->lock);
2642     cap = task->cap;
2643     RELEASE_LOCK(&task->lock);
2644
2645     // set the thread-local pointer to the Task:
2646     taskEnter(task);
2647
2648     // schedule() runs without a lock.
2649     cap = schedule(cap,task);
2650
2651     // On exit from schedule(), we have a Capability.
2652     releaseCapability(cap);
2653     taskStop(task);
2654 }
2655 #endif
2656
2657 /* ---------------------------------------------------------------------------
2658  * initScheduler()
2659  *
2660  * Initialise the scheduler.  This resets all the queues - if the
2661  * queues contained any threads, they'll be garbage collected at the
2662  * next pass.
2663  *
2664  * ------------------------------------------------------------------------ */
2665
2666 void 
2667 initScheduler(void)
2668 {
2669 #if defined(GRAN)
2670   nat i;
2671   for (i=0; i<=MAX_PROC; i++) {
2672     run_queue_hds[i]      = END_TSO_QUEUE;
2673     run_queue_tls[i]      = END_TSO_QUEUE;
2674     blocked_queue_hds[i]  = END_TSO_QUEUE;
2675     blocked_queue_tls[i]  = END_TSO_QUEUE;
2676     ccalling_threadss[i]  = END_TSO_QUEUE;
2677     blackhole_queue[i]    = END_TSO_QUEUE;
2678     sleeping_queue        = END_TSO_QUEUE;
2679   }
2680 #elif !defined(THREADED_RTS)
2681   blocked_queue_hd  = END_TSO_QUEUE;
2682   blocked_queue_tl  = END_TSO_QUEUE;
2683   sleeping_queue    = END_TSO_QUEUE;
2684 #endif
2685
2686   blackhole_queue   = END_TSO_QUEUE;
2687   all_threads       = END_TSO_QUEUE;
2688
2689   context_switch = 0;
2690   interrupted    = 0;
2691
2692   RtsFlags.ConcFlags.ctxtSwitchTicks =
2693       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2694       
2695 #if defined(THREADED_RTS)
2696   /* Initialise the mutex and condition variables used by
2697    * the scheduler. */
2698   initMutex(&sched_mutex);
2699 #endif
2700   
2701   ACQUIRE_LOCK(&sched_mutex);
2702
2703   /* A capability holds the state a native thread needs in
2704    * order to execute STG code. At least one capability is
2705    * floating around (only THREADED_RTS builds have more than one).
2706    */
2707   initCapabilities();
2708
2709   initTaskManager();
2710
2711 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2712   initSparkPools();
2713 #endif
2714
2715 #if defined(THREADED_RTS)
2716   /*
2717    * Eagerly start one worker to run each Capability, except for
2718    * Capability 0.  The idea is that we're probably going to start a
2719    * bound thread on Capability 0 pretty soon, so we don't want a
2720    * worker task hogging it.
2721    */
2722   { 
2723       nat i;
2724       Capability *cap;
2725       for (i = 1; i < n_capabilities; i++) {
2726           cap = &capabilities[i];
2727           ACQUIRE_LOCK(&cap->lock);
2728           startWorkerTask(cap, workerStart);
2729           RELEASE_LOCK(&cap->lock);
2730       }
2731   }
2732 #endif
2733
2734   RELEASE_LOCK(&sched_mutex);
2735 }
2736
2737 void
2738 exitScheduler( void )
2739 {
2740     interrupted = rtsTrue;
2741     shutting_down_scheduler = rtsTrue;
2742
2743 #if defined(THREADED_RTS)
2744     { 
2745         Task *task;
2746         nat i;
2747         
2748         ACQUIRE_LOCK(&sched_mutex);
2749         task = newBoundTask();
2750         RELEASE_LOCK(&sched_mutex);
2751
2752         for (i = 0; i < n_capabilities; i++) {
2753             shutdownCapability(&capabilities[i], task);
2754         }
2755         boundTaskExiting(task);
2756         stopTaskManager();
2757     }
2758 #endif
2759 }
2760
2761 /* ---------------------------------------------------------------------------
2762    Where are the roots that we know about?
2763
2764         - all the threads on the runnable queue
2765         - all the threads on the blocked queue
2766         - all the threads on the sleeping queue
2767         - all the thread currently executing a _ccall_GC
2768         - all the "main threads"
2769      
2770    ------------------------------------------------------------------------ */
2771
2772 /* This has to be protected either by the scheduler monitor, or by the
2773         garbage collection monitor (probably the latter).
2774         KH @ 25/10/99
2775 */
2776
2777 void
2778 GetRoots( evac_fn evac )
2779 {
2780     nat i;
2781     Capability *cap;
2782     Task *task;
2783
2784 #if defined(GRAN)
2785     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2786         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2787             evac((StgClosure **)&run_queue_hds[i]);
2788         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2789             evac((StgClosure **)&run_queue_tls[i]);
2790         
2791         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2792             evac((StgClosure **)&blocked_queue_hds[i]);
2793         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2794             evac((StgClosure **)&blocked_queue_tls[i]);
2795         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2796             evac((StgClosure **)&ccalling_threads[i]);
2797     }
2798
2799     markEventQueue();
2800
2801 #else /* !GRAN */
2802
2803     for (i = 0; i < n_capabilities; i++) {
2804         cap = &capabilities[i];
2805         evac((StgClosure **)&cap->run_queue_hd);
2806         evac((StgClosure **)&cap->run_queue_tl);
2807         
2808         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2809              task=task->next) {
2810             evac((StgClosure **)&task->suspended_tso);
2811         }
2812     }
2813     
2814 #if !defined(THREADED_RTS)
2815     evac((StgClosure **)(void *)&blocked_queue_hd);
2816     evac((StgClosure **)(void *)&blocked_queue_tl);
2817     evac((StgClosure **)(void *)&sleeping_queue);
2818 #endif 
2819 #endif
2820
2821     // evac((StgClosure **)&blackhole_queue);
2822
2823 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2824     markSparkQueue(evac);
2825 #endif
2826     
2827 #if defined(RTS_USER_SIGNALS)
2828     // mark the signal handlers (signals should be already blocked)
2829     markSignalHandlers(evac);
2830 #endif
2831 }
2832
2833 /* -----------------------------------------------------------------------------
2834    performGC
2835
2836    This is the interface to the garbage collector from Haskell land.
2837    We provide this so that external C code can allocate and garbage
2838    collect when called from Haskell via _ccall_GC.
2839
2840    It might be useful to provide an interface whereby the programmer
2841    can specify more roots (ToDo).
2842    
2843    This needs to be protected by the GC condition variable above.  KH.
2844    -------------------------------------------------------------------------- */
2845
2846 static void (*extra_roots)(evac_fn);
2847
2848 static void
2849 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2850 {
2851     Task *task = myTask();
2852
2853     if (task == NULL) {
2854         ACQUIRE_LOCK(&sched_mutex);
2855         task = newBoundTask();
2856         RELEASE_LOCK(&sched_mutex);
2857         scheduleDoGC(NULL,task,force_major, get_roots);
2858         boundTaskExiting(task);
2859     } else {
2860         scheduleDoGC(NULL,task,force_major, get_roots);
2861     }
2862 }
2863
2864 void
2865 performGC(void)
2866 {
2867     performGC_(rtsFalse, GetRoots);
2868 }
2869
2870 void
2871 performMajorGC(void)
2872 {
2873     performGC_(rtsTrue, GetRoots);
2874 }
2875
2876 static void
2877 AllRoots(evac_fn evac)
2878 {
2879     GetRoots(evac);             // the scheduler's roots
2880     extra_roots(evac);          // the user's roots
2881 }
2882
2883 void
2884 performGCWithRoots(void (*get_roots)(evac_fn))
2885 {
2886     extra_roots = get_roots;
2887     performGC_(rtsFalse, AllRoots);
2888 }
2889
2890 /* -----------------------------------------------------------------------------
2891    Stack overflow
2892
2893    If the thread has reached its maximum stack size, then raise the
2894    StackOverflow exception in the offending thread.  Otherwise
2895    relocate the TSO into a larger chunk of memory and adjust its stack
2896    size appropriately.
2897    -------------------------------------------------------------------------- */
2898
2899 static StgTSO *
2900 threadStackOverflow(Capability *cap, StgTSO *tso)
2901 {
2902   nat new_stack_size, stack_words;
2903   lnat new_tso_size;
2904   StgPtr new_sp;
2905   StgTSO *dest;
2906
2907   IF_DEBUG(sanity,checkTSO(tso));
2908   if (tso->stack_size >= tso->max_stack_size) {
2909
2910     IF_DEBUG(gc,
2911              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2912                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2913              /* If we're debugging, just print out the top of the stack */
2914              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2915                                               tso->sp+64)));
2916
2917     /* Send this thread the StackOverflow exception */
2918     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2919     return tso;
2920   }
2921
2922   /* Try to double the current stack size.  If that takes us over the
2923    * maximum stack size for this thread, then use the maximum instead.
2924    * Finally round up so the TSO ends up as a whole number of blocks.
2925    */
2926   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2927   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2928                                        TSO_STRUCT_SIZE)/sizeof(W_);
2929   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2930   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2931
2932   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2933
2934   dest = (StgTSO *)allocate(new_tso_size);
2935   TICK_ALLOC_TSO(new_stack_size,0);
2936
2937   /* copy the TSO block and the old stack into the new area */
2938   memcpy(dest,tso,TSO_STRUCT_SIZE);
2939   stack_words = tso->stack + tso->stack_size - tso->sp;
2940   new_sp = (P_)dest + new_tso_size - stack_words;
2941   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2942
2943   /* relocate the stack pointers... */
2944   dest->sp         = new_sp;
2945   dest->stack_size = new_stack_size;
2946         
2947   /* Mark the old TSO as relocated.  We have to check for relocated
2948    * TSOs in the garbage collector and any primops that deal with TSOs.
2949    *
2950    * It's important to set the sp value to just beyond the end
2951    * of the stack, so we don't attempt to scavenge any part of the
2952    * dead TSO's stack.
2953    */
2954   tso->what_next = ThreadRelocated;
2955   tso->link = dest;
2956   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2957   tso->why_blocked = NotBlocked;
2958
2959   IF_PAR_DEBUG(verbose,
2960                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2961                      tso->id, tso, tso->stack_size);
2962                /* If we're debugging, just print out the top of the stack */
2963                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2964                                                 tso->sp+64)));
2965   
2966   IF_DEBUG(sanity,checkTSO(tso));
2967 #if 0
2968   IF_DEBUG(scheduler,printTSO(dest));
2969 #endif
2970
2971   return dest;
2972 }
2973
2974 /* ---------------------------------------------------------------------------
2975    Wake up a queue that was blocked on some resource.
2976    ------------------------------------------------------------------------ */
2977
2978 #if defined(GRAN)
2979 STATIC_INLINE void
2980 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2981 {
2982 }
2983 #elif defined(PARALLEL_HASKELL)
2984 STATIC_INLINE void
2985 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2986 {
2987   /* write RESUME events to log file and
2988      update blocked and fetch time (depending on type of the orig closure) */
2989   if (RtsFlags.ParFlags.ParStats.Full) {
2990     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2991                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2992                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2993     if (emptyRunQueue())
2994       emitSchedule = rtsTrue;
2995
2996     switch (get_itbl(node)->type) {
2997         case FETCH_ME_BQ:
2998           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2999           break;
3000         case RBH:
3001         case FETCH_ME:
3002         case BLACKHOLE_BQ:
3003           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3004           break;
3005 #ifdef DIST
3006         case MVAR:
3007           break;
3008 #endif    
3009         default:
3010           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3011         }
3012       }
3013 }
3014 #endif
3015
3016 #if defined(GRAN)
3017 StgBlockingQueueElement *
3018 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3019 {
3020     StgTSO *tso;
3021     PEs node_loc, tso_loc;
3022
3023     node_loc = where_is(node); // should be lifted out of loop
3024     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3025     tso_loc = where_is((StgClosure *)tso);
3026     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3027       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3028       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3029       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3030       // insertThread(tso, node_loc);
3031       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3032                 ResumeThread,
3033                 tso, node, (rtsSpark*)NULL);
3034       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3035       // len_local++;
3036       // len++;
3037     } else { // TSO is remote (actually should be FMBQ)
3038       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3039                                   RtsFlags.GranFlags.Costs.gunblocktime +
3040                                   RtsFlags.GranFlags.Costs.latency;
3041       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3042                 UnblockThread,
3043                 tso, node, (rtsSpark*)NULL);
3044       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3045       // len++;
3046     }
3047     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3048     IF_GRAN_DEBUG(bq,
3049                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3050                           (node_loc==tso_loc ? "Local" : "Global"), 
3051                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3052     tso->block_info.closure = NULL;
3053     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3054                              tso->id, tso));
3055 }
3056 #elif defined(PARALLEL_HASKELL)
3057 StgBlockingQueueElement *
3058 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3059 {
3060     StgBlockingQueueElement *next;
3061
3062     switch (get_itbl(bqe)->type) {
3063     case TSO:
3064       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3065       /* if it's a TSO just push it onto the run_queue */
3066       next = bqe->link;
3067       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3068       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3069       threadRunnable();
3070       unblockCount(bqe, node);
3071       /* reset blocking status after dumping event */
3072       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3073       break;
3074
3075     case BLOCKED_FETCH:
3076       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3077       next = bqe->link;
3078       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3079       PendingFetches = (StgBlockedFetch *)bqe;
3080       break;
3081
3082 # if defined(DEBUG)
3083       /* can ignore this case in a non-debugging setup; 
3084          see comments on RBHSave closures above */
3085     case CONSTR:
3086       /* check that the closure is an RBHSave closure */
3087       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3088              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3089              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3090       break;
3091
3092     default:
3093       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3094            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3095            (StgClosure *)bqe);
3096 # endif
3097     }
3098   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3099   return next;
3100 }
3101 #endif
3102
3103 StgTSO *
3104 unblockOne(Capability *cap, StgTSO *tso)
3105 {
3106   StgTSO *next;
3107
3108   ASSERT(get_itbl(tso)->type == TSO);
3109   ASSERT(tso->why_blocked != NotBlocked);
3110   tso->why_blocked = NotBlocked;
3111   next = tso->link;
3112   tso->link = END_TSO_QUEUE;
3113
3114   // We might have just migrated this TSO to our Capability:
3115   if (tso->bound) {
3116       tso->bound->cap = cap;
3117   }
3118
3119   appendToRunQueue(cap,tso);
3120
3121   // we're holding a newly woken thread, make sure we context switch
3122   // quickly so we can migrate it if necessary.
3123   context_switch = 1;
3124   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3125   return next;
3126 }
3127
3128
3129 #if defined(GRAN)
3130 void 
3131 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3132 {
3133   StgBlockingQueueElement *bqe;
3134   PEs node_loc;
3135   nat len = 0; 
3136
3137   IF_GRAN_DEBUG(bq, 
3138                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3139                       node, CurrentProc, CurrentTime[CurrentProc], 
3140                       CurrentTSO->id, CurrentTSO));
3141
3142   node_loc = where_is(node);
3143
3144   ASSERT(q == END_BQ_QUEUE ||
3145          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3146          get_itbl(q)->type == CONSTR); // closure (type constructor)
3147   ASSERT(is_unique(node));
3148
3149   /* FAKE FETCH: magically copy the node to the tso's proc;
3150      no Fetch necessary because in reality the node should not have been 
3151      moved to the other PE in the first place
3152   */
3153   if (CurrentProc!=node_loc) {
3154     IF_GRAN_DEBUG(bq, 
3155                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3156                         node, node_loc, CurrentProc, CurrentTSO->id, 
3157                         // CurrentTSO, where_is(CurrentTSO),
3158                         node->header.gran.procs));
3159     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3160     IF_GRAN_DEBUG(bq, 
3161                   debugBelch("## new bitmask of node %p is %#x\n",
3162                         node, node->header.gran.procs));
3163     if (RtsFlags.GranFlags.GranSimStats.Global) {
3164       globalGranStats.tot_fake_fetches++;
3165     }
3166   }
3167
3168   bqe = q;
3169   // ToDo: check: ASSERT(CurrentProc==node_loc);
3170   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3171     //next = bqe->link;
3172     /* 
3173        bqe points to the current element in the queue
3174        next points to the next element in the queue
3175     */
3176     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3177     //tso_loc = where_is(tso);
3178     len++;
3179     bqe = unblockOne(bqe, node);
3180   }
3181
3182   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3183      the closure to make room for the anchor of the BQ */
3184   if (bqe!=END_BQ_QUEUE) {
3185     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3186     /*
3187     ASSERT((info_ptr==&RBH_Save_0_info) ||
3188            (info_ptr==&RBH_Save_1_info) ||
3189            (info_ptr==&RBH_Save_2_info));
3190     */
3191     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3192     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3193     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3194
3195     IF_GRAN_DEBUG(bq,
3196                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3197                         node, info_type(node)));
3198   }
3199
3200   /* statistics gathering */
3201   if (RtsFlags.GranFlags.GranSimStats.Global) {
3202     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3203     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3204     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3205     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3206   }
3207   IF_GRAN_DEBUG(bq,
3208                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3209                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3210 }
3211 #elif defined(PARALLEL_HASKELL)
3212 void 
3213 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3214 {
3215   StgBlockingQueueElement *bqe;
3216
3217   IF_PAR_DEBUG(verbose, 
3218                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3219                      node, mytid));
3220 #ifdef DIST  
3221   //RFP
3222   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3223     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3224     return;
3225   }
3226 #endif
3227   
3228   ASSERT(q == END_BQ_QUEUE ||
3229          get_itbl(q)->type == TSO ||           
3230          get_itbl(q)->type == BLOCKED_FETCH || 
3231          get_itbl(q)->type == CONSTR); 
3232
3233   bqe = q;
3234   while (get_itbl(bqe)->type==TSO || 
3235          get_itbl(bqe)->type==BLOCKED_FETCH) {
3236     bqe = unblockOne(bqe, node);
3237   }
3238 }
3239
3240 #else   /* !GRAN && !PARALLEL_HASKELL */
3241
3242 void
3243 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3244 {
3245     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3246                              // Exception.cmm
3247     while (tso != END_TSO_QUEUE) {
3248         tso = unblockOne(cap,tso);
3249     }
3250 }
3251 #endif
3252
3253 /* ---------------------------------------------------------------------------
3254    Interrupt execution
3255    - usually called inside a signal handler so it mustn't do anything fancy.   
3256    ------------------------------------------------------------------------ */
3257
3258 void
3259 interruptStgRts(void)
3260 {
3261     interrupted    = 1;
3262     context_switch = 1;
3263 #if defined(THREADED_RTS)
3264     prodAllCapabilities();
3265 #endif
3266 }
3267
3268 /* -----------------------------------------------------------------------------
3269    Unblock a thread
3270
3271    This is for use when we raise an exception in another thread, which
3272    may be blocked.
3273    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3274    -------------------------------------------------------------------------- */
3275
3276 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3277 /*
3278   NB: only the type of the blocking queue is different in GranSim and GUM
3279       the operations on the queue-elements are the same
3280       long live polymorphism!
3281
3282   Locks: sched_mutex is held upon entry and exit.
3283
3284 */
3285 static void
3286 unblockThread(Capability *cap, StgTSO *tso)
3287 {
3288   StgBlockingQueueElement *t, **last;
3289
3290   switch (tso->why_blocked) {
3291
3292   case NotBlocked:
3293     return;  /* not blocked */
3294
3295   case BlockedOnSTM:
3296     // Be careful: nothing to do here!  We tell the scheduler that the thread
3297     // is runnable and we leave it to the stack-walking code to abort the 
3298     // transaction while unwinding the stack.  We should perhaps have a debugging
3299     // test to make sure that this really happens and that the 'zombie' transaction
3300     // does not get committed.
3301     goto done;
3302
3303   case BlockedOnMVar:
3304     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3305     {
3306       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3307       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3308
3309       last = (StgBlockingQueueElement **)&mvar->head;
3310       for (t = (StgBlockingQueueElement *)mvar->head; 
3311            t != END_BQ_QUEUE; 
3312            last = &t->link, last_tso = t, t = t->link) {
3313         if (t == (StgBlockingQueueElement *)tso) {
3314           *last = (StgBlockingQueueElement *)tso->link;
3315           if (mvar->tail == tso) {
3316             mvar->tail = (StgTSO *)last_tso;
3317           }
3318           goto done;
3319         }
3320       }
3321       barf("unblockThread (MVAR): TSO not found");
3322     }
3323
3324   case BlockedOnBlackHole:
3325     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3326     {
3327       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3328
3329       last = &bq->blocking_queue;
3330       for (t = bq->blocking_queue; 
3331            t != END_BQ_QUEUE; 
3332            last = &t->link, t = t->link) {
3333         if (t == (StgBlockingQueueElement *)tso) {
3334           *last = (StgBlockingQueueElement *)tso->link;
3335           goto done;
3336         }
3337       }
3338       barf("unblockThread (BLACKHOLE): TSO not found");
3339     }
3340
3341   case BlockedOnException:
3342     {
3343       StgTSO *target  = tso->block_info.tso;
3344
3345       ASSERT(get_itbl(target)->type == TSO);
3346
3347       if (target->what_next == ThreadRelocated) {
3348           target = target->link;
3349           ASSERT(get_itbl(target)->type == TSO);
3350       }
3351
3352       ASSERT(target->blocked_exceptions != NULL);
3353
3354       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3355       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3356            t != END_BQ_QUEUE; 
3357            last = &t->link, t = t->link) {
3358         ASSERT(get_itbl(t)->type == TSO);
3359         if (t == (StgBlockingQueueElement *)tso) {
3360           *last = (StgBlockingQueueElement *)tso->link;
3361           goto done;
3362         }
3363       }
3364       barf("unblockThread (Exception): TSO not found");
3365     }
3366
3367   case BlockedOnRead:
3368   case BlockedOnWrite:
3369 #if defined(mingw32_HOST_OS)
3370   case BlockedOnDoProc:
3371 #endif
3372     {
3373       /* take TSO off blocked_queue */
3374       StgBlockingQueueElement *prev = NULL;
3375       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3376            prev = t, t = t->link) {
3377         if (t == (StgBlockingQueueElement *)tso) {
3378           if (prev == NULL) {
3379             blocked_queue_hd = (StgTSO *)t->link;
3380             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3381               blocked_queue_tl = END_TSO_QUEUE;
3382             }
3383           } else {
3384             prev->link = t->link;
3385             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3386               blocked_queue_tl = (StgTSO *)prev;
3387             }
3388           }
3389 #if defined(mingw32_HOST_OS)
3390           /* (Cooperatively) signal that the worker thread should abort
3391            * the request.
3392            */
3393           abandonWorkRequest(tso->block_info.async_result->reqID);
3394 #endif
3395           goto done;
3396         }
3397       }
3398       barf("unblockThread (I/O): TSO not found");
3399     }
3400
3401   case BlockedOnDelay:
3402     {
3403       /* take TSO off sleeping_queue */
3404       StgBlockingQueueElement *prev = NULL;
3405       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3406            prev = t, t = t->link) {
3407         if (t == (StgBlockingQueueElement *)tso) {
3408           if (prev == NULL) {
3409             sleeping_queue = (StgTSO *)t->link;
3410           } else {
3411             prev->link = t->link;
3412           }
3413           goto done;
3414         }
3415       }
3416       barf("unblockThread (delay): TSO not found");
3417     }
3418
3419   default:
3420     barf("unblockThread");
3421   }
3422
3423  done:
3424   tso->link = END_TSO_QUEUE;
3425   tso->why_blocked = NotBlocked;
3426   tso->block_info.closure = NULL;
3427   pushOnRunQueue(cap,tso);
3428 }
3429 #else
3430 static void
3431 unblockThread(Capability *cap, StgTSO *tso)
3432 {
3433   StgTSO *t, **last;
3434   
3435   /* To avoid locking unnecessarily. */
3436   if (tso->why_blocked == NotBlocked) {
3437     return;
3438   }
3439
3440   switch (tso->why_blocked) {
3441
3442   case BlockedOnSTM:
3443     // Be careful: nothing to do here!  We tell the scheduler that the thread
3444     // is runnable and we leave it to the stack-walking code to abort the 
3445     // transaction while unwinding the stack.  We should perhaps have a debugging
3446     // test to make sure that this really happens and that the 'zombie' transaction
3447     // does not get committed.
3448     goto done;
3449
3450   case BlockedOnMVar:
3451     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3452     {
3453       StgTSO *last_tso = END_TSO_QUEUE;
3454       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3455
3456       last = &mvar->head;
3457       for (t = mvar->head; t != END_TSO_QUEUE; 
3458            last = &t->link, last_tso = t, t = t->link) {
3459         if (t == tso) {
3460           *last = tso->link;
3461           if (mvar->tail == tso) {
3462             mvar->tail = last_tso;
3463           }
3464           goto done;
3465         }
3466       }
3467       barf("unblockThread (MVAR): TSO not found");
3468     }
3469
3470   case BlockedOnBlackHole:
3471     {
3472       last = &blackhole_queue;
3473       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3474            last = &t->link, t = t->link) {
3475         if (t == tso) {
3476           *last = tso->link;
3477           goto done;
3478         }
3479       }
3480       barf("unblockThread (BLACKHOLE): TSO not found");
3481     }
3482
3483   case BlockedOnException:
3484     {
3485       StgTSO *target  = tso->block_info.tso;
3486
3487       ASSERT(get_itbl(target)->type == TSO);
3488
3489       while (target->what_next == ThreadRelocated) {
3490           target = target->link;
3491           ASSERT(get_itbl(target)->type == TSO);
3492       }
3493       
3494       ASSERT(target->blocked_exceptions != NULL);
3495
3496       last = &target->blocked_exceptions;
3497       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3498            last = &t->link, t = t->link) {
3499         ASSERT(get_itbl(t)->type == TSO);
3500         if (t == tso) {
3501           *last = tso->link;
3502           goto done;
3503         }
3504       }
3505       barf("unblockThread (Exception): TSO not found");
3506     }
3507
3508 #if !defined(THREADED_RTS)
3509   case BlockedOnRead:
3510   case BlockedOnWrite:
3511 #if defined(mingw32_HOST_OS)
3512   case BlockedOnDoProc:
3513 #endif
3514     {
3515       StgTSO *prev = NULL;
3516       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3517            prev = t, t = t->link) {
3518         if (t == tso) {
3519           if (prev == NULL) {
3520             blocked_queue_hd = t->link;
3521             if (blocked_queue_tl == t) {
3522               blocked_queue_tl = END_TSO_QUEUE;
3523             }
3524           } else {
3525             prev->link = t->link;
3526             if (blocked_queue_tl == t) {
3527               blocked_queue_tl = prev;
3528             }
3529           }
3530 #if defined(mingw32_HOST_OS)
3531           /* (Cooperatively) signal that the worker thread should abort
3532            * the request.
3533            */
3534           abandonWorkRequest(tso->block_info.async_result->reqID);
3535 #endif
3536           goto done;
3537         }
3538       }
3539       barf("unblockThread (I/O): TSO not found");
3540     }
3541
3542   case BlockedOnDelay:
3543     {
3544       StgTSO *prev = NULL;
3545       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3546            prev = t, t = t->link) {
3547         if (t == tso) {
3548           if (prev == NULL) {
3549             sleeping_queue = t->link;
3550           } else {
3551             prev->link = t->link;
3552           }
3553           goto done;
3554         }
3555       }
3556       barf("unblockThread (delay): TSO not found");
3557     }
3558 #endif
3559
3560   default:
3561     barf("unblockThread");
3562   }
3563
3564  done:
3565   tso->link = END_TSO_QUEUE;
3566   tso->why_blocked = NotBlocked;
3567   tso->block_info.closure = NULL;
3568   appendToRunQueue(cap,tso);
3569 }
3570 #endif
3571
3572 /* -----------------------------------------------------------------------------
3573  * checkBlackHoles()
3574  *
3575  * Check the blackhole_queue for threads that can be woken up.  We do
3576  * this periodically: before every GC, and whenever the run queue is
3577  * empty.
3578  *
3579  * An elegant solution might be to just wake up all the blocked
3580  * threads with awakenBlockedQueue occasionally: they'll go back to
3581  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3582  * doesn't give us a way to tell whether we've actually managed to
3583  * wake up any threads, so we would be busy-waiting.
3584  *
3585  * -------------------------------------------------------------------------- */
3586
3587 static rtsBool
3588 checkBlackHoles (Capability *cap)
3589 {
3590     StgTSO **prev, *t;
3591     rtsBool any_woke_up = rtsFalse;
3592     StgHalfWord type;
3593
3594     // blackhole_queue is global:
3595     ASSERT_LOCK_HELD(&sched_mutex);
3596
3597     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3598
3599     // ASSUMES: sched_mutex
3600     prev = &blackhole_queue;
3601     t = blackhole_queue;
3602     while (t != END_TSO_QUEUE) {
3603         ASSERT(t->why_blocked == BlockedOnBlackHole);
3604         type = get_itbl(t->block_info.closure)->type;
3605         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3606             IF_DEBUG(sanity,checkTSO(t));
3607             t = unblockOne(cap, t);
3608             // urk, the threads migrate to the current capability
3609             // here, but we'd like to keep them on the original one.
3610             *prev = t;
3611             any_woke_up = rtsTrue;
3612         } else {
3613             prev = &t->link;
3614             t = t->link;
3615         }
3616     }
3617
3618     return any_woke_up;
3619 }
3620
3621 /* -----------------------------------------------------------------------------
3622  * raiseAsync()
3623  *
3624  * The following function implements the magic for raising an
3625  * asynchronous exception in an existing thread.
3626  *
3627  * We first remove the thread from any queue on which it might be
3628  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3629  *
3630  * We strip the stack down to the innermost CATCH_FRAME, building
3631  * thunks in the heap for all the active computations, so they can 
3632  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3633  * an application of the handler to the exception, and push it on
3634  * the top of the stack.
3635  * 
3636  * How exactly do we save all the active computations?  We create an
3637  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3638  * AP_STACKs pushes everything from the corresponding update frame
3639  * upwards onto the stack.  (Actually, it pushes everything up to the
3640  * next update frame plus a pointer to the next AP_STACK object.
3641  * Entering the next AP_STACK object pushes more onto the stack until we
3642  * reach the last AP_STACK object - at which point the stack should look
3643  * exactly as it did when we killed the TSO and we can continue
3644  * execution by entering the closure on top of the stack.
3645  *
3646  * We can also kill a thread entirely - this happens if either (a) the 
3647  * exception passed to raiseAsync is NULL, or (b) there's no
3648  * CATCH_FRAME on the stack.  In either case, we strip the entire
3649  * stack and replace the thread with a zombie.
3650  *
3651  * ToDo: in THREADED_RTS mode, this function is only safe if either
3652  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3653  * one Capability), or (b) we own the Capability that the TSO is
3654  * currently blocked on or on the run queue of.
3655  *
3656  * -------------------------------------------------------------------------- */
3657  
3658 void
3659 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3660 {
3661     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3662 }
3663
3664 void
3665 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3666 {
3667     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3668 }
3669
3670 static void
3671 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3672             rtsBool stop_at_atomically, StgPtr stop_here)
3673 {
3674     StgRetInfoTable *info;
3675     StgPtr sp, frame;
3676     nat i;
3677   
3678     // Thread already dead?
3679     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3680         return;
3681     }
3682
3683     IF_DEBUG(scheduler, 
3684              sched_belch("raising exception in thread %ld.", (long)tso->id));
3685     
3686     // Remove it from any blocking queues
3687     unblockThread(cap,tso);
3688
3689     // mark it dirty; we're about to change its stack.
3690     dirtyTSO(tso);
3691
3692     sp = tso->sp;
3693     
3694     // The stack freezing code assumes there's a closure pointer on
3695     // the top of the stack, so we have to arrange that this is the case...
3696     //
3697     if (sp[0] == (W_)&stg_enter_info) {
3698         sp++;
3699     } else {
3700         sp--;
3701         sp[0] = (W_)&stg_dummy_ret_closure;
3702     }
3703
3704     frame = sp + 1;
3705     while (stop_here == NULL || frame < stop_here) {
3706
3707         // 1. Let the top of the stack be the "current closure"
3708         //
3709         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3710         // CATCH_FRAME.
3711         //
3712         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3713         // current closure applied to the chunk of stack up to (but not
3714         // including) the update frame.  This closure becomes the "current
3715         // closure".  Go back to step 2.
3716         //
3717         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3718         // top of the stack applied to the exception.
3719         // 
3720         // 5. If it's a STOP_FRAME, then kill the thread.
3721         // 
3722         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3723         // transaction
3724        
3725         info = get_ret_itbl((StgClosure *)frame);
3726
3727         switch (info->i.type) {
3728
3729         case UPDATE_FRAME:
3730         {
3731             StgAP_STACK * ap;
3732             nat words;
3733             
3734             // First build an AP_STACK consisting of the stack chunk above the
3735             // current update frame, with the top word on the stack as the
3736             // fun field.
3737             //
3738             words = frame - sp - 1;
3739             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3740             
3741             ap->size = words;
3742             ap->fun  = (StgClosure *)sp[0];
3743             sp++;
3744             for(i=0; i < (nat)words; ++i) {
3745                 ap->payload[i] = (StgClosure *)*sp++;
3746             }
3747             
3748             SET_HDR(ap,&stg_AP_STACK_info,
3749                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3750             TICK_ALLOC_UP_THK(words+1,0);
3751             
3752             IF_DEBUG(scheduler,
3753                      debugBelch("sched: Updating ");
3754                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3755                      debugBelch(" with ");
3756                      printObj((StgClosure *)ap);
3757                 );
3758
3759             // Replace the updatee with an indirection
3760             //
3761             // Warning: if we're in a loop, more than one update frame on
3762             // the stack may point to the same object.  Be careful not to
3763             // overwrite an IND_OLDGEN in this case, because we'll screw
3764             // up the mutable lists.  To be on the safe side, don't
3765             // overwrite any kind of indirection at all.  See also
3766             // threadSqueezeStack in GC.c, where we have to make a similar
3767             // check.
3768             //
3769             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3770                 // revert the black hole
3771                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3772                                (StgClosure *)ap);
3773             }
3774             sp += sizeofW(StgUpdateFrame) - 1;
3775             sp[0] = (W_)ap; // push onto stack
3776             frame = sp + 1;
3777             continue; //no need to bump frame
3778         }
3779
3780         case STOP_FRAME:
3781             // We've stripped the entire stack, the thread is now dead.
3782             tso->what_next = ThreadKilled;
3783             tso->sp = frame + sizeofW(StgStopFrame);
3784             return;
3785
3786         case CATCH_FRAME:
3787             // If we find a CATCH_FRAME, and we've got an exception to raise,
3788             // then build the THUNK raise(exception), and leave it on
3789             // top of the CATCH_FRAME ready to enter.
3790             //
3791         {
3792 #ifdef PROFILING
3793             StgCatchFrame *cf = (StgCatchFrame *)frame;
3794 #endif
3795             StgThunk *raise;
3796             
3797             if (exception == NULL) break;
3798
3799             // we've got an exception to raise, so let's pass it to the
3800             // handler in this frame.
3801             //
3802             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3803             TICK_ALLOC_SE_THK(1,0);
3804             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3805             raise->payload[0] = exception;
3806             
3807             // throw away the stack from Sp up to the CATCH_FRAME.
3808             //
3809             sp = frame - 1;
3810             
3811             /* Ensure that async excpetions are blocked now, so we don't get
3812              * a surprise exception before we get around to executing the
3813              * handler.
3814              */
3815             if (tso->blocked_exceptions == NULL) {
3816                 tso->blocked_exceptions = END_TSO_QUEUE;
3817             }
3818
3819             /* Put the newly-built THUNK on top of the stack, ready to execute
3820              * when the thread restarts.
3821              */
3822             sp[0] = (W_)raise;
3823             sp[-1] = (W_)&stg_enter_info;
3824             tso->sp = sp-1;
3825             tso->what_next = ThreadRunGHC;
3826             IF_DEBUG(sanity, checkTSO(tso));
3827             return;
3828         }
3829             
3830         case ATOMICALLY_FRAME:
3831             if (stop_at_atomically) {
3832                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3833                 stmCondemnTransaction(cap, tso -> trec);
3834 #ifdef REG_R1
3835                 tso->sp = frame;
3836 #else
3837                 // R1 is not a register: the return convention for IO in
3838                 // this case puts the return value on the stack, so we
3839                 // need to set up the stack to return to the atomically
3840                 // frame properly...
3841                 tso->sp = frame - 2;
3842                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3843                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3844 #endif
3845                 tso->what_next = ThreadRunGHC;
3846                 return;
3847             }
3848             // Not stop_at_atomically... fall through and abort the
3849             // transaction.
3850             
3851         case CATCH_RETRY_FRAME:
3852             // IF we find an ATOMICALLY_FRAME then we abort the
3853             // current transaction and propagate the exception.  In
3854             // this case (unlike ordinary exceptions) we do not care
3855             // whether the transaction is valid or not because its
3856             // possible validity cannot have caused the exception
3857             // and will not be visible after the abort.
3858             IF_DEBUG(stm,
3859                      debugBelch("Found atomically block delivering async exception\n"));
3860             StgTRecHeader *trec = tso -> trec;
3861             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3862             stmAbortTransaction(cap, trec);
3863             tso -> trec = outer;
3864             break;
3865             
3866         default:
3867             break;
3868         }
3869
3870         // move on to the next stack frame
3871         frame += stack_frame_sizeW((StgClosure *)frame);
3872     }
3873
3874     // if we got here, then we stopped at stop_here
3875     ASSERT(stop_here != NULL);
3876 }
3877
3878 /* -----------------------------------------------------------------------------
3879    Deleting threads
3880
3881    This is used for interruption (^C) and forking, and corresponds to
3882    raising an exception but without letting the thread catch the
3883    exception.
3884    -------------------------------------------------------------------------- */
3885
3886 static void 
3887 deleteThread (Capability *cap, StgTSO *tso)
3888 {
3889   if (tso->why_blocked != BlockedOnCCall &&
3890       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3891       raiseAsync(cap,tso,NULL);
3892   }
3893 }
3894
3895 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3896 static void 
3897 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3898 { // for forkProcess only:
3899   // delete thread without giving it a chance to catch the KillThread exception
3900
3901   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3902       return;
3903   }
3904
3905   if (tso->why_blocked != BlockedOnCCall &&
3906       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3907       unblockThread(cap,tso);
3908   }
3909
3910   tso->what_next = ThreadKilled;
3911 }
3912 #endif
3913
3914 /* -----------------------------------------------------------------------------
3915    raiseExceptionHelper
3916    
3917    This function is called by the raise# primitve, just so that we can
3918    move some of the tricky bits of raising an exception from C-- into
3919    C.  Who knows, it might be a useful re-useable thing here too.
3920    -------------------------------------------------------------------------- */
3921
3922 StgWord
3923 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3924 {
3925     Capability *cap = regTableToCapability(reg);
3926     StgThunk *raise_closure = NULL;
3927     StgPtr p, next;
3928     StgRetInfoTable *info;
3929     //
3930     // This closure represents the expression 'raise# E' where E
3931     // is the exception raise.  It is used to overwrite all the
3932     // thunks which are currently under evaluataion.
3933     //
3934
3935     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3936     // LDV profiling: stg_raise_info has THUNK as its closure
3937     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3938     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3939     // 1 does not cause any problem unless profiling is performed.
3940     // However, when LDV profiling goes on, we need to linearly scan
3941     // small object pool, where raise_closure is stored, so we should
3942     // use MIN_UPD_SIZE.
3943     //
3944     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3945     //                                 sizeofW(StgClosure)+1);
3946     //
3947
3948     //
3949     // Walk up the stack, looking for the catch frame.  On the way,
3950     // we update any closures pointed to from update frames with the
3951     // raise closure that we just built.
3952     //
3953     p = tso->sp;
3954     while(1) {
3955         info = get_ret_itbl((StgClosure *)p);
3956         next = p + stack_frame_sizeW((StgClosure *)p);
3957         switch (info->i.type) {
3958             
3959         case UPDATE_FRAME:
3960             // Only create raise_closure if we need to.
3961             if (raise_closure == NULL) {
3962                 raise_closure = 
3963                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3964                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3965                 raise_closure->payload[0] = exception;
3966             }
3967             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3968             p = next;
3969             continue;
3970
3971         case ATOMICALLY_FRAME:
3972             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3973             tso->sp = p;
3974             return ATOMICALLY_FRAME;
3975             
3976         case CATCH_FRAME:
3977             tso->sp = p;
3978             return CATCH_FRAME;
3979
3980         case CATCH_STM_FRAME:
3981             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3982             tso->sp = p;
3983             return CATCH_STM_FRAME;
3984             
3985         case STOP_FRAME:
3986             tso->sp = p;
3987             return STOP_FRAME;
3988
3989         case CATCH_RETRY_FRAME:
3990         default:
3991             p = next; 
3992             continue;
3993         }
3994     }
3995 }
3996
3997
3998 /* -----------------------------------------------------------------------------
3999    findRetryFrameHelper
4000
4001    This function is called by the retry# primitive.  It traverses the stack
4002    leaving tso->sp referring to the frame which should handle the retry.  
4003
4004    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4005    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4006
4007    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4008    despite the similar implementation.
4009
4010    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4011    not be created within memory transactions.
4012    -------------------------------------------------------------------------- */
4013
4014 StgWord
4015 findRetryFrameHelper (StgTSO *tso)
4016 {
4017   StgPtr           p, next;
4018   StgRetInfoTable *info;
4019
4020   p = tso -> sp;
4021   while (1) {
4022     info = get_ret_itbl((StgClosure *)p);
4023     next = p + stack_frame_sizeW((StgClosure *)p);
4024     switch (info->i.type) {
4025       
4026     case ATOMICALLY_FRAME:
4027       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4028       tso->sp = p;
4029       return ATOMICALLY_FRAME;
4030       
4031     case CATCH_RETRY_FRAME:
4032       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4033       tso->sp = p;
4034       return CATCH_RETRY_FRAME;
4035       
4036     case CATCH_STM_FRAME:
4037     default:
4038       ASSERT(info->i.type != CATCH_FRAME);
4039       ASSERT(info->i.type != STOP_FRAME);
4040       p = next; 
4041       continue;
4042     }
4043   }
4044 }
4045
4046 /* -----------------------------------------------------------------------------
4047    resurrectThreads is called after garbage collection on the list of
4048    threads found to be garbage.  Each of these threads will be woken
4049    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4050    on an MVar, or NonTermination if the thread was blocked on a Black
4051    Hole.
4052
4053    Locks: assumes we hold *all* the capabilities.
4054    -------------------------------------------------------------------------- */
4055
4056 void
4057 resurrectThreads (StgTSO *threads)
4058 {
4059     StgTSO *tso, *next;
4060     Capability *cap;
4061
4062     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4063         next = tso->global_link;
4064         tso->global_link = all_threads;
4065         all_threads = tso;
4066         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4067         
4068         // Wake up the thread on the Capability it was last on for a
4069         // bound thread, or last_free_capability otherwise.
4070         if (tso->bound) {
4071             cap = tso->bound->cap;
4072         } else {
4073             cap = last_free_capability;
4074         }
4075         
4076         switch (tso->why_blocked) {
4077         case BlockedOnMVar:
4078         case BlockedOnException:
4079             /* Called by GC - sched_mutex lock is currently held. */
4080             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4081             break;
4082         case BlockedOnBlackHole:
4083             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4084             break;
4085         case BlockedOnSTM:
4086             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4087             break;
4088         case NotBlocked:
4089             /* This might happen if the thread was blocked on a black hole
4090              * belonging to a thread that we've just woken up (raiseAsync
4091              * can wake up threads, remember...).
4092              */
4093             continue;
4094         default:
4095             barf("resurrectThreads: thread blocked in a strange way");
4096         }
4097     }
4098 }
4099
4100 /* ----------------------------------------------------------------------------
4101  * Debugging: why is a thread blocked
4102  * [Also provides useful information when debugging threaded programs
4103  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4104    ------------------------------------------------------------------------- */
4105
4106 #if DEBUG
4107 static void
4108 printThreadBlockage(StgTSO *tso)
4109 {
4110   switch (tso->why_blocked) {
4111   case BlockedOnRead:
4112     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4113     break;
4114   case BlockedOnWrite:
4115     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4116     break;
4117 #if defined(mingw32_HOST_OS)
4118     case BlockedOnDoProc:
4119     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4120     break;
4121 #endif
4122   case BlockedOnDelay:
4123     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4124     break;
4125   case BlockedOnMVar:
4126     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4127     break;
4128   case BlockedOnException:
4129     debugBelch("is blocked on delivering an exception to thread %d",
4130             tso->block_info.tso->id);
4131     break;
4132   case BlockedOnBlackHole:
4133     debugBelch("is blocked on a black hole");
4134     break;
4135   case NotBlocked:
4136     debugBelch("is not blocked");
4137     break;
4138 #if defined(PARALLEL_HASKELL)
4139   case BlockedOnGA:
4140     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4141             tso->block_info.closure, info_type(tso->block_info.closure));
4142     break;
4143   case BlockedOnGA_NoSend:
4144     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4145             tso->block_info.closure, info_type(tso->block_info.closure));
4146     break;
4147 #endif
4148   case BlockedOnCCall:
4149     debugBelch("is blocked on an external call");
4150     break;
4151   case BlockedOnCCall_NoUnblockExc:
4152     debugBelch("is blocked on an external call (exceptions were already blocked)");
4153     break;
4154   case BlockedOnSTM:
4155     debugBelch("is blocked on an STM operation");
4156     break;
4157   default:
4158     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4159          tso->why_blocked, tso->id, tso);
4160   }
4161 }
4162
4163 void
4164 printThreadStatus(StgTSO *t)
4165 {
4166     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4167     {
4168       void *label = lookupThreadLabel(t->id);
4169       if (label) debugBelch("[\"%s\"] ",(char *)label);
4170     }
4171     if (t->what_next == ThreadRelocated) {
4172         debugBelch("has been relocated...\n");
4173     } else {
4174         switch (t->what_next) {
4175         case ThreadKilled:
4176             debugBelch("has been killed");
4177             break;
4178         case ThreadComplete:
4179             debugBelch("has completed");
4180             break;
4181         default:
4182             printThreadBlockage(t);
4183         }
4184         debugBelch("\n");
4185     }
4186 }
4187
4188 void
4189 printAllThreads(void)
4190 {
4191   StgTSO *t, *next;
4192   nat i;
4193   Capability *cap;
4194
4195 # if defined(GRAN)
4196   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4197   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4198                        time_string, rtsFalse/*no commas!*/);
4199
4200   debugBelch("all threads at [%s]:\n", time_string);
4201 # elif defined(PARALLEL_HASKELL)
4202   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4203   ullong_format_string(CURRENT_TIME,
4204                        time_string, rtsFalse/*no commas!*/);
4205
4206   debugBelch("all threads at [%s]:\n", time_string);
4207 # else
4208   debugBelch("all threads:\n");
4209 # endif
4210
4211   for (i = 0; i < n_capabilities; i++) {
4212       cap = &capabilities[i];
4213       debugBelch("threads on capability %d:\n", cap->no);
4214       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4215           printThreadStatus(t);
4216       }
4217   }
4218
4219   debugBelch("other threads:\n");
4220   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4221       if (t->why_blocked != NotBlocked) {
4222           printThreadStatus(t);
4223       }
4224       if (t->what_next == ThreadRelocated) {
4225           next = t->link;
4226       } else {
4227           next = t->global_link;
4228       }
4229   }
4230 }
4231
4232 // useful from gdb
4233 void 
4234 printThreadQueue(StgTSO *t)
4235 {
4236     nat i = 0;
4237     for (; t != END_TSO_QUEUE; t = t->link) {
4238         printThreadStatus(t);
4239         i++;
4240     }
4241     debugBelch("%d threads on queue\n", i);
4242 }
4243
4244 /* 
4245    Print a whole blocking queue attached to node (debugging only).
4246 */
4247 # if defined(PARALLEL_HASKELL)
4248 void 
4249 print_bq (StgClosure *node)
4250 {
4251   StgBlockingQueueElement *bqe;
4252   StgTSO *tso;
4253   rtsBool end;
4254
4255   debugBelch("## BQ of closure %p (%s): ",
4256           node, info_type(node));
4257
4258   /* should cover all closures that may have a blocking queue */
4259   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4260          get_itbl(node)->type == FETCH_ME_BQ ||
4261          get_itbl(node)->type == RBH ||
4262          get_itbl(node)->type == MVAR);
4263     
4264   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4265
4266   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4267 }
4268
4269 /* 
4270    Print a whole blocking queue starting with the element bqe.
4271 */
4272 void 
4273 print_bqe (StgBlockingQueueElement *bqe)
4274 {
4275   rtsBool end;
4276
4277   /* 
4278      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4279   */
4280   for (end = (bqe==END_BQ_QUEUE);
4281        !end; // iterate until bqe points to a CONSTR
4282        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4283        bqe = end ? END_BQ_QUEUE : bqe->link) {
4284     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4285     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4286     /* types of closures that may appear in a blocking queue */
4287     ASSERT(get_itbl(bqe)->type == TSO ||           
4288            get_itbl(bqe)->type == BLOCKED_FETCH || 
4289            get_itbl(bqe)->type == CONSTR); 
4290     /* only BQs of an RBH end with an RBH_Save closure */
4291     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4292
4293     switch (get_itbl(bqe)->type) {
4294     case TSO:
4295       debugBelch(" TSO %u (%x),",
4296               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4297       break;
4298     case BLOCKED_FETCH:
4299       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4300               ((StgBlockedFetch *)bqe)->node, 
4301               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4302               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4303               ((StgBlockedFetch *)bqe)->ga.weight);
4304       break;
4305     case CONSTR:
4306       debugBelch(" %s (IP %p),",
4307               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4308                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4309                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4310                "RBH_Save_?"), get_itbl(bqe));
4311       break;
4312     default:
4313       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4314            info_type((StgClosure *)bqe)); // , node, info_type(node));
4315       break;
4316     }
4317   } /* for */
4318   debugBelch("\n");
4319 }
4320 # elif defined(GRAN)
4321 void 
4322 print_bq (StgClosure *node)
4323 {
4324   StgBlockingQueueElement *bqe;
4325   PEs node_loc, tso_loc;
4326   rtsBool end;
4327
4328   /* should cover all closures that may have a blocking queue */
4329   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4330          get_itbl(node)->type == FETCH_ME_BQ ||
4331          get_itbl(node)->type == RBH);
4332     
4333   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4334   node_loc = where_is(node);
4335
4336   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4337           node, info_type(node), node_loc);
4338
4339   /* 
4340      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4341   */
4342   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4343        !end; // iterate until bqe points to a CONSTR
4344        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4345     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4346     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4347     /* types of closures that may appear in a blocking queue */
4348     ASSERT(get_itbl(bqe)->type == TSO ||           
4349            get_itbl(bqe)->type == CONSTR); 
4350     /* only BQs of an RBH end with an RBH_Save closure */
4351     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4352
4353     tso_loc = where_is((StgClosure *)bqe);
4354     switch (get_itbl(bqe)->type) {
4355     case TSO:
4356       debugBelch(" TSO %d (%p) on [PE %d],",
4357               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4358       break;
4359     case CONSTR:
4360       debugBelch(" %s (IP %p),",
4361               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4362                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4363                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4364                "RBH_Save_?"), get_itbl(bqe));
4365       break;
4366     default:
4367       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4368            info_type((StgClosure *)bqe), node, info_type(node));
4369       break;
4370     }
4371   } /* for */
4372   debugBelch("\n");
4373 }
4374 # endif
4375
4376 #if defined(PARALLEL_HASKELL)
4377 static nat
4378 run_queue_len(void)
4379 {
4380     nat i;
4381     StgTSO *tso;
4382     
4383     for (i=0, tso=run_queue_hd; 
4384          tso != END_TSO_QUEUE;
4385          i++, tso=tso->link) {
4386         /* nothing */
4387     }
4388         
4389     return i;
4390 }
4391 #endif
4392
4393 void
4394 sched_belch(char *s, ...)
4395 {
4396     va_list ap;
4397     va_start(ap,s);
4398 #ifdef THREADED_RTS
4399     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4400 #elif defined(PARALLEL_HASKELL)
4401     debugBelch("== ");
4402 #else
4403     debugBelch("sched: ");
4404 #endif
4405     vdebugBelch(s, ap);
4406     debugBelch("\n");
4407     va_end(ap);
4408 }
4409
4410 #endif /* DEBUG */