make the smp way RTS-only, normal libraries now work with -smp
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS and (inc. SMP) runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(SMP)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
231
232 static void unblockThread(Capability *cap, StgTSO *tso);
233 static rtsBool checkBlackHoles(Capability *cap);
234 static void AllRoots(evac_fn evac);
235
236 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
237
238 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
239                         rtsBool stop_at_atomically, StgPtr stop_here);
240
241 static void deleteThread (Capability *cap, StgTSO *tso);
242 static void deleteRunQueue (Capability *cap);
243
244 #ifdef DEBUG
245 static void printThreadBlockage(StgTSO *tso);
246 static void printThreadStatus(StgTSO *tso);
247 void printThreadQueue(StgTSO *tso);
248 #endif
249
250 #if defined(PARALLEL_HASKELL)
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);  
253 #endif
254
255 #ifdef DEBUG
256 static char *whatNext_strs[] = {
257   "(unknown)",
258   "ThreadRunGHC",
259   "ThreadInterpret",
260   "ThreadKilled",
261   "ThreadRelocated",
262   "ThreadComplete"
263 };
264 #endif
265
266 /* -----------------------------------------------------------------------------
267  * Putting a thread on the run queue: different scheduling policies
268  * -------------------------------------------------------------------------- */
269
270 STATIC_INLINE void
271 addToRunQueue( Capability *cap, StgTSO *t )
272 {
273 #if defined(PARALLEL_HASKELL)
274     if (RtsFlags.ParFlags.doFairScheduling) { 
275         // this does round-robin scheduling; good for concurrency
276         appendToRunQueue(cap,t);
277     } else {
278         // this does unfair scheduling; good for parallelism
279         pushOnRunQueue(cap,t);
280     }
281 #else
282     // this does round-robin scheduling; good for concurrency
283     appendToRunQueue(cap,t);
284 #endif
285 }
286
287 /* ---------------------------------------------------------------------------
288    Main scheduling loop.
289
290    We use round-robin scheduling, each thread returning to the
291    scheduler loop when one of these conditions is detected:
292
293       * out of heap space
294       * timer expires (thread yields)
295       * thread blocks
296       * thread ends
297       * stack overflow
298
299    GRAN version:
300      In a GranSim setup this loop iterates over the global event queue.
301      This revolves around the global event queue, which determines what 
302      to do next. Therefore, it's more complicated than either the 
303      concurrent or the parallel (GUM) setup.
304
305    GUM version:
306      GUM iterates over incoming messages.
307      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
308      and sends out a fish whenever it has nothing to do; in-between
309      doing the actual reductions (shared code below) it processes the
310      incoming messages and deals with delayed operations 
311      (see PendingFetches).
312      This is not the ugliest code you could imagine, but it's bloody close.
313
314    ------------------------------------------------------------------------ */
315
316 static Capability *
317 schedule (Capability *initialCapability, Task *task)
318 {
319   StgTSO *t;
320   Capability *cap;
321   StgThreadReturnCode ret;
322 #if defined(GRAN)
323   rtsEvent *event;
324 #elif defined(PARALLEL_HASKELL)
325   StgTSO *tso;
326   GlobalTaskId pe;
327   rtsBool receivedFinish = rtsFalse;
328 # if defined(DEBUG)
329   nat tp_size, sp_size; // stats only
330 # endif
331 #endif
332   nat prev_what_next;
333   rtsBool ready_to_gc;
334 #if defined(THREADED_RTS)
335   rtsBool first = rtsTrue;
336 #endif
337   
338   cap = initialCapability;
339
340   // Pre-condition: this task owns initialCapability.
341   // The sched_mutex is *NOT* held
342   // NB. on return, we still hold a capability.
343
344   IF_DEBUG(scheduler,
345            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
346                        task, initialCapability);
347       );
348
349   schedulePreLoop();
350
351   // -----------------------------------------------------------
352   // Scheduler loop starts here:
353
354 #if defined(PARALLEL_HASKELL)
355 #define TERMINATION_CONDITION        (!receivedFinish)
356 #elif defined(GRAN)
357 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
358 #else
359 #define TERMINATION_CONDITION        rtsTrue
360 #endif
361
362   while (TERMINATION_CONDITION) {
363
364 #if defined(GRAN)
365       /* Choose the processor with the next event */
366       CurrentProc = event->proc;
367       CurrentTSO = event->tso;
368 #endif
369
370 #if defined(THREADED_RTS)
371       if (first) {
372           // don't yield the first time, we want a chance to run this
373           // thread for a bit, even if there are others banging at the
374           // door.
375           first = rtsFalse;
376           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
377       } else {
378           // Yield the capability to higher-priority tasks if necessary.
379           yieldCapability(&cap, task);
380       }
381 #endif
382       
383 #ifdef SMP
384       schedulePushWork(cap,task);
385 #endif
386
387     // Check whether we have re-entered the RTS from Haskell without
388     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
389     // call).
390     if (cap->in_haskell) {
391           errorBelch("schedule: re-entered unsafely.\n"
392                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
393           stg_exit(EXIT_FAILURE);
394     }
395
396     //
397     // Test for interruption.  If interrupted==rtsTrue, then either
398     // we received a keyboard interrupt (^C), or the scheduler is
399     // trying to shut down all the tasks (shutting_down_scheduler) in
400     // the threaded RTS.
401     //
402     if (interrupted) {
403         deleteRunQueue(cap);
404 #if defined(SMP)
405         discardSparksCap(cap);
406 #endif
407         if (shutting_down_scheduler) {
408             IF_DEBUG(scheduler, sched_belch("shutting down"));
409             // If we are a worker, just exit.  If we're a bound thread
410             // then we will exit below when we've removed our TSO from
411             // the run queue.
412             if (task->tso == NULL && emptyRunQueue(cap)) {
413                 return cap;
414             }
415         } else {
416             IF_DEBUG(scheduler, sched_belch("interrupted"));
417         }
418     }
419
420 #if defined(SMP)
421     // If the run queue is empty, take a spark and turn it into a thread.
422     {
423         if (emptyRunQueue(cap)) {
424             StgClosure *spark;
425             spark = findSpark(cap);
426             if (spark != NULL) {
427                 IF_DEBUG(scheduler,
428                          sched_belch("turning spark of closure %p into a thread",
429                                      (StgClosure *)spark));
430                 createSparkThread(cap,spark);     
431             }
432         }
433     }
434 #endif // SMP
435
436     scheduleStartSignalHandlers(cap);
437
438     // Only check the black holes here if we've nothing else to do.
439     // During normal execution, the black hole list only gets checked
440     // at GC time, to avoid repeatedly traversing this possibly long
441     // list each time around the scheduler.
442     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
443
444     scheduleCheckBlockedThreads(cap);
445
446     scheduleDetectDeadlock(cap,task);
447
448     // Normally, the only way we can get here with no threads to
449     // run is if a keyboard interrupt received during 
450     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
451     // Additionally, it is not fatal for the
452     // threaded RTS to reach here with no threads to run.
453     //
454     // win32: might be here due to awaitEvent() being abandoned
455     // as a result of a console event having been delivered.
456     if ( emptyRunQueue(cap) ) {
457 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
458         ASSERT(interrupted);
459 #endif
460         continue; // nothing to do
461     }
462
463 #if defined(PARALLEL_HASKELL)
464     scheduleSendPendingMessages();
465     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
466         continue;
467
468 #if defined(SPARKS)
469     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
470 #endif
471
472     /* If we still have no work we need to send a FISH to get a spark
473        from another PE */
474     if (emptyRunQueue(cap)) {
475         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
476         ASSERT(rtsFalse); // should not happen at the moment
477     }
478     // from here: non-empty run queue.
479     //  TODO: merge above case with this, only one call processMessages() !
480     if (PacketsWaiting()) {  /* process incoming messages, if
481                                 any pending...  only in else
482                                 because getRemoteWork waits for
483                                 messages as well */
484         receivedFinish = processMessages();
485     }
486 #endif
487
488 #if defined(GRAN)
489     scheduleProcessEvent(event);
490 #endif
491
492     // 
493     // Get a thread to run
494     //
495     t = popRunQueue(cap);
496
497 #if defined(GRAN) || defined(PAR)
498     scheduleGranParReport(); // some kind of debuging output
499 #else
500     // Sanity check the thread we're about to run.  This can be
501     // expensive if there is lots of thread switching going on...
502     IF_DEBUG(sanity,checkTSO(t));
503 #endif
504
505 #if defined(THREADED_RTS)
506     // Check whether we can run this thread in the current task.
507     // If not, we have to pass our capability to the right task.
508     {
509         Task *bound = t->bound;
510       
511         if (bound) {
512             if (bound == task) {
513                 IF_DEBUG(scheduler,
514                          sched_belch("### Running thread %d in bound thread",
515                                      t->id));
516                 // yes, the Haskell thread is bound to the current native thread
517             } else {
518                 IF_DEBUG(scheduler,
519                          sched_belch("### thread %d bound to another OS thread",
520                                      t->id));
521                 // no, bound to a different Haskell thread: pass to that thread
522                 pushOnRunQueue(cap,t);
523                 continue;
524             }
525         } else {
526             // The thread we want to run is unbound.
527             if (task->tso) { 
528                 IF_DEBUG(scheduler,
529                          sched_belch("### this OS thread cannot run thread %d", t->id));
530                 // no, the current native thread is bound to a different
531                 // Haskell thread, so pass it to any worker thread
532                 pushOnRunQueue(cap,t);
533                 continue; 
534             }
535         }
536     }
537 #endif
538
539     cap->r.rCurrentTSO = t;
540     
541     /* context switches are initiated by the timer signal, unless
542      * the user specified "context switch as often as possible", with
543      * +RTS -C0
544      */
545     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
546         && !emptyThreadQueues(cap)) {
547         context_switch = 1;
548     }
549          
550 run_thread:
551
552     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
553                               (long)t->id, whatNext_strs[t->what_next]));
554
555 #if defined(PROFILING)
556     startHeapProfTimer();
557 #endif
558
559     // ----------------------------------------------------------------------
560     // Run the current thread 
561
562     prev_what_next = t->what_next;
563
564     errno = t->saved_errno;
565     cap->in_haskell = rtsTrue;
566
567     dirtyTSO(t);
568
569     recent_activity = ACTIVITY_YES;
570
571     switch (prev_what_next) {
572         
573     case ThreadKilled:
574     case ThreadComplete:
575         /* Thread already finished, return to scheduler. */
576         ret = ThreadFinished;
577         break;
578         
579     case ThreadRunGHC:
580     {
581         StgRegTable *r;
582         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
583         cap = regTableToCapability(r);
584         ret = r->rRet;
585         break;
586     }
587     
588     case ThreadInterpret:
589         cap = interpretBCO(cap);
590         ret = cap->r.rRet;
591         break;
592         
593     default:
594         barf("schedule: invalid what_next field");
595     }
596
597     cap->in_haskell = rtsFalse;
598
599     // The TSO might have moved, eg. if it re-entered the RTS and a GC
600     // happened.  So find the new location:
601     t = cap->r.rCurrentTSO;
602
603     // We have run some Haskell code: there might be blackhole-blocked
604     // threads to wake up now.
605     // Lock-free test here should be ok, we're just setting a flag.
606     if ( blackhole_queue != END_TSO_QUEUE ) {
607         blackholes_need_checking = rtsTrue;
608     }
609     
610     // And save the current errno in this thread.
611     // XXX: possibly bogus for SMP because this thread might already
612     // be running again, see code below.
613     t->saved_errno = errno;
614
615 #ifdef SMP
616     // If ret is ThreadBlocked, and this Task is bound to the TSO that
617     // blocked, we are in limbo - the TSO is now owned by whatever it
618     // is blocked on, and may in fact already have been woken up,
619     // perhaps even on a different Capability.  It may be the case
620     // that task->cap != cap.  We better yield this Capability
621     // immediately and return to normaility.
622     if (ret == ThreadBlocked) {
623         IF_DEBUG(scheduler,
624                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
625                              t->id, whatNext_strs[t->what_next]));
626         continue;
627     }
628 #endif
629
630     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
631
632     // ----------------------------------------------------------------------
633     
634     // Costs for the scheduler are assigned to CCS_SYSTEM
635 #if defined(PROFILING)
636     stopHeapProfTimer();
637     CCCS = CCS_SYSTEM;
638 #endif
639     
640 #if defined(THREADED_RTS)
641     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
642 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
643     IF_DEBUG(scheduler,debugBelch("sched: "););
644 #endif
645     
646     schedulePostRunThread();
647
648     ready_to_gc = rtsFalse;
649
650     switch (ret) {
651     case HeapOverflow:
652         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
653         break;
654
655     case StackOverflow:
656         scheduleHandleStackOverflow(cap,task,t);
657         break;
658
659     case ThreadYielding:
660         if (scheduleHandleYield(cap, t, prev_what_next)) {
661             // shortcut for switching between compiler/interpreter:
662             goto run_thread; 
663         }
664         break;
665
666     case ThreadBlocked:
667         scheduleHandleThreadBlocked(t);
668         break;
669
670     case ThreadFinished:
671         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
672         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
673         break;
674
675     default:
676       barf("schedule: invalid thread return code %d", (int)ret);
677     }
678
679     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
680     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
681   } /* end of while() */
682
683   IF_PAR_DEBUG(verbose,
684                debugBelch("== Leaving schedule() after having received Finish\n"));
685 }
686
687 /* ----------------------------------------------------------------------------
688  * Setting up the scheduler loop
689  * ------------------------------------------------------------------------- */
690
691 static void
692 schedulePreLoop(void)
693 {
694 #if defined(GRAN) 
695     /* set up first event to get things going */
696     /* ToDo: assign costs for system setup and init MainTSO ! */
697     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
698               ContinueThread, 
699               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
700     
701     IF_DEBUG(gran,
702              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
703                         CurrentTSO);
704              G_TSO(CurrentTSO, 5));
705     
706     if (RtsFlags.GranFlags.Light) {
707         /* Save current time; GranSim Light only */
708         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
709     }      
710 #endif
711 }
712
713 /* -----------------------------------------------------------------------------
714  * schedulePushWork()
715  *
716  * Push work to other Capabilities if we have some.
717  * -------------------------------------------------------------------------- */
718
719 #ifdef SMP
720 static void
721 schedulePushWork(Capability *cap USED_IF_SMP, 
722                  Task *task      USED_IF_SMP)
723 {
724     Capability *free_caps[n_capabilities], *cap0;
725     nat i, n_free_caps;
726
727     // Check whether we have more threads on our run queue, or sparks
728     // in our pool, that we could hand to another Capability.
729     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
730         && sparkPoolSizeCap(cap) < 2) {
731         return;
732     }
733
734     // First grab as many free Capabilities as we can.
735     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
736         cap0 = &capabilities[i];
737         if (cap != cap0 && tryGrabCapability(cap0,task)) {
738             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
739                 // it already has some work, we just grabbed it at 
740                 // the wrong moment.  Or maybe it's deadlocked!
741                 releaseCapability(cap0);
742             } else {
743                 free_caps[n_free_caps++] = cap0;
744             }
745         }
746     }
747
748     // we now have n_free_caps free capabilities stashed in
749     // free_caps[].  Share our run queue equally with them.  This is
750     // probably the simplest thing we could do; improvements we might
751     // want to do include:
752     //
753     //   - giving high priority to moving relatively new threads, on 
754     //     the gournds that they haven't had time to build up a
755     //     working set in the cache on this CPU/Capability.
756     //
757     //   - giving low priority to moving long-lived threads
758
759     if (n_free_caps > 0) {
760         StgTSO *prev, *t, *next;
761         rtsBool pushed_to_all;
762
763         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
764
765         i = 0;
766         pushed_to_all = rtsFalse;
767
768         if (cap->run_queue_hd != END_TSO_QUEUE) {
769             prev = cap->run_queue_hd;
770             t = prev->link;
771             prev->link = END_TSO_QUEUE;
772             for (; t != END_TSO_QUEUE; t = next) {
773                 next = t->link;
774                 t->link = END_TSO_QUEUE;
775                 if (t->what_next == ThreadRelocated
776                     || t->bound == task) { // don't move my bound thread
777                     prev->link = t;
778                     prev = t;
779                 } else if (i == n_free_caps) {
780                     pushed_to_all = rtsTrue;
781                     i = 0;
782                     // keep one for us
783                     prev->link = t;
784                     prev = t;
785                 } else {
786                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
787                     appendToRunQueue(free_caps[i],t);
788                     if (t->bound) { t->bound->cap = free_caps[i]; }
789                     i++;
790                 }
791             }
792             cap->run_queue_tl = prev;
793         }
794
795         // If there are some free capabilities that we didn't push any
796         // threads to, then try to push a spark to each one.
797         if (!pushed_to_all) {
798             StgClosure *spark;
799             // i is the next free capability to push to
800             for (; i < n_free_caps; i++) {
801                 if (emptySparkPoolCap(free_caps[i])) {
802                     spark = findSpark(cap);
803                     if (spark != NULL) {
804                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
805                         newSpark(&(free_caps[i]->r), spark);
806                     }
807                 }
808             }
809         }
810
811         // release the capabilities
812         for (i = 0; i < n_free_caps; i++) {
813             task->cap = free_caps[i];
814             releaseCapability(free_caps[i]);
815         }
816     }
817     task->cap = cap; // reset to point to our Capability.
818 }
819 #endif
820
821 /* ----------------------------------------------------------------------------
822  * Start any pending signal handlers
823  * ------------------------------------------------------------------------- */
824
825 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
826 static void
827 scheduleStartSignalHandlers(Capability *cap)
828 {
829     if (signals_pending()) { // safe outside the lock
830         startSignalHandlers(cap);
831     }
832 }
833 #else
834 static void
835 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
836 {
837 }
838 #endif
839
840 /* ----------------------------------------------------------------------------
841  * Check for blocked threads that can be woken up.
842  * ------------------------------------------------------------------------- */
843
844 static void
845 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
846 {
847 #if !defined(THREADED_RTS)
848     //
849     // Check whether any waiting threads need to be woken up.  If the
850     // run queue is empty, and there are no other tasks running, we
851     // can wait indefinitely for something to happen.
852     //
853     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
854     {
855         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
856     }
857 #endif
858 }
859
860
861 /* ----------------------------------------------------------------------------
862  * Check for threads blocked on BLACKHOLEs that can be woken up
863  * ------------------------------------------------------------------------- */
864 static void
865 scheduleCheckBlackHoles (Capability *cap)
866 {
867     if ( blackholes_need_checking ) // check without the lock first
868     {
869         ACQUIRE_LOCK(&sched_mutex);
870         if ( blackholes_need_checking ) {
871             checkBlackHoles(cap);
872             blackholes_need_checking = rtsFalse;
873         }
874         RELEASE_LOCK(&sched_mutex);
875     }
876 }
877
878 /* ----------------------------------------------------------------------------
879  * Detect deadlock conditions and attempt to resolve them.
880  * ------------------------------------------------------------------------- */
881
882 static void
883 scheduleDetectDeadlock (Capability *cap, Task *task)
884 {
885
886 #if defined(PARALLEL_HASKELL)
887     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
888     return;
889 #endif
890
891     /* 
892      * Detect deadlock: when we have no threads to run, there are no
893      * threads blocked, waiting for I/O, or sleeping, and all the
894      * other tasks are waiting for work, we must have a deadlock of
895      * some description.
896      */
897     if ( emptyThreadQueues(cap) )
898     {
899 #if defined(THREADED_RTS)
900         /* 
901          * In the threaded RTS, we only check for deadlock if there
902          * has been no activity in a complete timeslice.  This means
903          * we won't eagerly start a full GC just because we don't have
904          * any threads to run currently.
905          */
906         if (recent_activity != ACTIVITY_INACTIVE) return;
907 #endif
908
909         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
910
911         // Garbage collection can release some new threads due to
912         // either (a) finalizers or (b) threads resurrected because
913         // they are unreachable and will therefore be sent an
914         // exception.  Any threads thus released will be immediately
915         // runnable.
916         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
917         recent_activity = ACTIVITY_DONE_GC;
918         
919         if ( !emptyRunQueue(cap) ) return;
920
921 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
922         /* If we have user-installed signal handlers, then wait
923          * for signals to arrive rather then bombing out with a
924          * deadlock.
925          */
926         if ( anyUserHandlers() ) {
927             IF_DEBUG(scheduler, 
928                      sched_belch("still deadlocked, waiting for signals..."));
929
930             awaitUserSignals();
931
932             if (signals_pending()) {
933                 startSignalHandlers(cap);
934             }
935
936             // either we have threads to run, or we were interrupted:
937             ASSERT(!emptyRunQueue(cap) || interrupted);
938         }
939 #endif
940
941 #if !defined(THREADED_RTS)
942         /* Probably a real deadlock.  Send the current main thread the
943          * Deadlock exception.
944          */
945         if (task->tso) {
946             switch (task->tso->why_blocked) {
947             case BlockedOnSTM:
948             case BlockedOnBlackHole:
949             case BlockedOnException:
950             case BlockedOnMVar:
951                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
952                 return;
953             default:
954                 barf("deadlock: main thread blocked in a strange way");
955             }
956         }
957         return;
958 #endif
959     }
960 }
961
962 /* ----------------------------------------------------------------------------
963  * Process an event (GRAN only)
964  * ------------------------------------------------------------------------- */
965
966 #if defined(GRAN)
967 static StgTSO *
968 scheduleProcessEvent(rtsEvent *event)
969 {
970     StgTSO *t;
971
972     if (RtsFlags.GranFlags.Light)
973       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
974
975     /* adjust time based on time-stamp */
976     if (event->time > CurrentTime[CurrentProc] &&
977         event->evttype != ContinueThread)
978       CurrentTime[CurrentProc] = event->time;
979     
980     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
981     if (!RtsFlags.GranFlags.Light)
982       handleIdlePEs();
983
984     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
985
986     /* main event dispatcher in GranSim */
987     switch (event->evttype) {
988       /* Should just be continuing execution */
989     case ContinueThread:
990       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
991       /* ToDo: check assertion
992       ASSERT(run_queue_hd != (StgTSO*)NULL &&
993              run_queue_hd != END_TSO_QUEUE);
994       */
995       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
996       if (!RtsFlags.GranFlags.DoAsyncFetch &&
997           procStatus[CurrentProc]==Fetching) {
998         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
999               CurrentTSO->id, CurrentTSO, CurrentProc);
1000         goto next_thread;
1001       } 
1002       /* Ignore ContinueThreads for completed threads */
1003       if (CurrentTSO->what_next == ThreadComplete) {
1004         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1005               CurrentTSO->id, CurrentTSO, CurrentProc);
1006         goto next_thread;
1007       } 
1008       /* Ignore ContinueThreads for threads that are being migrated */
1009       if (PROCS(CurrentTSO)==Nowhere) { 
1010         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1011               CurrentTSO->id, CurrentTSO, CurrentProc);
1012         goto next_thread;
1013       }
1014       /* The thread should be at the beginning of the run queue */
1015       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1016         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1017               CurrentTSO->id, CurrentTSO, CurrentProc);
1018         break; // run the thread anyway
1019       }
1020       /*
1021       new_event(proc, proc, CurrentTime[proc],
1022                 FindWork,
1023                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1024       goto next_thread; 
1025       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1026       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1027
1028     case FetchNode:
1029       do_the_fetchnode(event);
1030       goto next_thread;             /* handle next event in event queue  */
1031       
1032     case GlobalBlock:
1033       do_the_globalblock(event);
1034       goto next_thread;             /* handle next event in event queue  */
1035       
1036     case FetchReply:
1037       do_the_fetchreply(event);
1038       goto next_thread;             /* handle next event in event queue  */
1039       
1040     case UnblockThread:   /* Move from the blocked queue to the tail of */
1041       do_the_unblock(event);
1042       goto next_thread;             /* handle next event in event queue  */
1043       
1044     case ResumeThread:  /* Move from the blocked queue to the tail of */
1045       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1046       event->tso->gran.blocktime += 
1047         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1048       do_the_startthread(event);
1049       goto next_thread;             /* handle next event in event queue  */
1050       
1051     case StartThread:
1052       do_the_startthread(event);
1053       goto next_thread;             /* handle next event in event queue  */
1054       
1055     case MoveThread:
1056       do_the_movethread(event);
1057       goto next_thread;             /* handle next event in event queue  */
1058       
1059     case MoveSpark:
1060       do_the_movespark(event);
1061       goto next_thread;             /* handle next event in event queue  */
1062       
1063     case FindWork:
1064       do_the_findwork(event);
1065       goto next_thread;             /* handle next event in event queue  */
1066       
1067     default:
1068       barf("Illegal event type %u\n", event->evttype);
1069     }  /* switch */
1070     
1071     /* This point was scheduler_loop in the old RTS */
1072
1073     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1074
1075     TimeOfLastEvent = CurrentTime[CurrentProc];
1076     TimeOfNextEvent = get_time_of_next_event();
1077     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1078     // CurrentTSO = ThreadQueueHd;
1079
1080     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1081                          TimeOfNextEvent));
1082
1083     if (RtsFlags.GranFlags.Light) 
1084       GranSimLight_leave_system(event, &ActiveTSO); 
1085
1086     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1087
1088     IF_DEBUG(gran, 
1089              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1090
1091     /* in a GranSim setup the TSO stays on the run queue */
1092     t = CurrentTSO;
1093     /* Take a thread from the run queue. */
1094     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1095
1096     IF_DEBUG(gran, 
1097              debugBelch("GRAN: About to run current thread, which is\n");
1098              G_TSO(t,5));
1099
1100     context_switch = 0; // turned on via GranYield, checking events and time slice
1101
1102     IF_DEBUG(gran, 
1103              DumpGranEvent(GR_SCHEDULE, t));
1104
1105     procStatus[CurrentProc] = Busy;
1106 }
1107 #endif // GRAN
1108
1109 /* ----------------------------------------------------------------------------
1110  * Send pending messages (PARALLEL_HASKELL only)
1111  * ------------------------------------------------------------------------- */
1112
1113 #if defined(PARALLEL_HASKELL)
1114 static StgTSO *
1115 scheduleSendPendingMessages(void)
1116 {
1117     StgSparkPool *pool;
1118     rtsSpark spark;
1119     StgTSO *t;
1120
1121 # if defined(PAR) // global Mem.Mgmt., omit for now
1122     if (PendingFetches != END_BF_QUEUE) {
1123         processFetches();
1124     }
1125 # endif
1126     
1127     if (RtsFlags.ParFlags.BufferTime) {
1128         // if we use message buffering, we must send away all message
1129         // packets which have become too old...
1130         sendOldBuffers(); 
1131     }
1132 }
1133 #endif
1134
1135 /* ----------------------------------------------------------------------------
1136  * Activate spark threads (PARALLEL_HASKELL only)
1137  * ------------------------------------------------------------------------- */
1138
1139 #if defined(PARALLEL_HASKELL)
1140 static void
1141 scheduleActivateSpark(void)
1142 {
1143 #if defined(SPARKS)
1144   ASSERT(emptyRunQueue());
1145 /* We get here if the run queue is empty and want some work.
1146    We try to turn a spark into a thread, and add it to the run queue,
1147    from where it will be picked up in the next iteration of the scheduler
1148    loop.
1149 */
1150
1151       /* :-[  no local threads => look out for local sparks */
1152       /* the spark pool for the current PE */
1153       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1154       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1155           pool->hd < pool->tl) {
1156         /* 
1157          * ToDo: add GC code check that we really have enough heap afterwards!!
1158          * Old comment:
1159          * If we're here (no runnable threads) and we have pending
1160          * sparks, we must have a space problem.  Get enough space
1161          * to turn one of those pending sparks into a
1162          * thread... 
1163          */
1164
1165         spark = findSpark(rtsFalse);            /* get a spark */
1166         if (spark != (rtsSpark) NULL) {
1167           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1168           IF_PAR_DEBUG(fish, // schedule,
1169                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1170                              tso->id, tso, advisory_thread_count));
1171
1172           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1173             IF_PAR_DEBUG(fish, // schedule,
1174                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1175                             spark));
1176             return rtsFalse; /* failed to generate a thread */
1177           }                  /* otherwise fall through & pick-up new tso */
1178         } else {
1179           IF_PAR_DEBUG(fish, // schedule,
1180                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1181                              spark_queue_len(pool)));
1182           return rtsFalse;  /* failed to generate a thread */
1183         }
1184         return rtsTrue;  /* success in generating a thread */
1185   } else { /* no more threads permitted or pool empty */
1186     return rtsFalse;  /* failed to generateThread */
1187   }
1188 #else
1189   tso = NULL; // avoid compiler warning only
1190   return rtsFalse;  /* dummy in non-PAR setup */
1191 #endif // SPARKS
1192 }
1193 #endif // PARALLEL_HASKELL
1194
1195 /* ----------------------------------------------------------------------------
1196  * Get work from a remote node (PARALLEL_HASKELL only)
1197  * ------------------------------------------------------------------------- */
1198     
1199 #if defined(PARALLEL_HASKELL)
1200 static rtsBool
1201 scheduleGetRemoteWork(rtsBool *receivedFinish)
1202 {
1203   ASSERT(emptyRunQueue());
1204
1205   if (RtsFlags.ParFlags.BufferTime) {
1206         IF_PAR_DEBUG(verbose, 
1207                 debugBelch("...send all pending data,"));
1208         {
1209           nat i;
1210           for (i=1; i<=nPEs; i++)
1211             sendImmediately(i); // send all messages away immediately
1212         }
1213   }
1214 # ifndef SPARKS
1215         //++EDEN++ idle() , i.e. send all buffers, wait for work
1216         // suppress fishing in EDEN... just look for incoming messages
1217         // (blocking receive)
1218   IF_PAR_DEBUG(verbose, 
1219                debugBelch("...wait for incoming messages...\n"));
1220   *receivedFinish = processMessages(); // blocking receive...
1221
1222         // and reenter scheduling loop after having received something
1223         // (return rtsFalse below)
1224
1225 # else /* activate SPARKS machinery */
1226 /* We get here, if we have no work, tried to activate a local spark, but still
1227    have no work. We try to get a remote spark, by sending a FISH message.
1228    Thread migration should be added here, and triggered when a sequence of 
1229    fishes returns without work. */
1230         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1231
1232       /* =8-[  no local sparks => look for work on other PEs */
1233         /*
1234          * We really have absolutely no work.  Send out a fish
1235          * (there may be some out there already), and wait for
1236          * something to arrive.  We clearly can't run any threads
1237          * until a SCHEDULE or RESUME arrives, and so that's what
1238          * we're hoping to see.  (Of course, we still have to
1239          * respond to other types of messages.)
1240          */
1241         rtsTime now = msTime() /*CURRENT_TIME*/;
1242         IF_PAR_DEBUG(verbose, 
1243                      debugBelch("--  now=%ld\n", now));
1244         IF_PAR_DEBUG(fish, // verbose,
1245              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1246                  (last_fish_arrived_at!=0 &&
1247                   last_fish_arrived_at+delay > now)) {
1248                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1249                      now, last_fish_arrived_at+delay, 
1250                      last_fish_arrived_at,
1251                      delay);
1252              });
1253   
1254         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1255             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1256           if (last_fish_arrived_at==0 ||
1257               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1258             /* outstandingFishes is set in sendFish, processFish;
1259                avoid flooding system with fishes via delay */
1260     next_fish_to_send_at = 0;  
1261   } else {
1262     /* ToDo: this should be done in the main scheduling loop to avoid the
1263              busy wait here; not so bad if fish delay is very small  */
1264     int iq = 0; // DEBUGGING -- HWL
1265     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1266     /* send a fish when ready, but process messages that arrive in the meantime */
1267     do {
1268       if (PacketsWaiting()) {
1269         iq++; // DEBUGGING
1270         *receivedFinish = processMessages();
1271       }
1272       now = msTime();
1273     } while (!*receivedFinish || now<next_fish_to_send_at);
1274     // JB: This means the fish could become obsolete, if we receive
1275     // work. Better check for work again? 
1276     // last line: while (!receivedFinish || !haveWork || now<...)
1277     // next line: if (receivedFinish || haveWork )
1278
1279     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1280       return rtsFalse;  // NB: this will leave scheduler loop
1281                         // immediately after return!
1282                           
1283     IF_PAR_DEBUG(fish, // verbose,
1284                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1285
1286   }
1287
1288     // JB: IMHO, this should all be hidden inside sendFish(...)
1289     /* pe = choosePE(); 
1290        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1291                 NEW_FISH_HUNGER);
1292
1293     // Global statistics: count no. of fishes
1294     if (RtsFlags.ParFlags.ParStats.Global &&
1295          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1296            globalParStats.tot_fish_mess++;
1297            }
1298     */ 
1299
1300   /* delayed fishes must have been sent by now! */
1301   next_fish_to_send_at = 0;  
1302   }
1303       
1304   *receivedFinish = processMessages();
1305 # endif /* SPARKS */
1306
1307  return rtsFalse;
1308  /* NB: this function always returns rtsFalse, meaning the scheduler
1309     loop continues with the next iteration; 
1310     rationale: 
1311       return code means success in finding work; we enter this function
1312       if there is no local work, thus have to send a fish which takes
1313       time until it arrives with work; in the meantime we should process
1314       messages in the main loop;
1315  */
1316 }
1317 #endif // PARALLEL_HASKELL
1318
1319 /* ----------------------------------------------------------------------------
1320  * PAR/GRAN: Report stats & debugging info(?)
1321  * ------------------------------------------------------------------------- */
1322
1323 #if defined(PAR) || defined(GRAN)
1324 static void
1325 scheduleGranParReport(void)
1326 {
1327   ASSERT(run_queue_hd != END_TSO_QUEUE);
1328
1329   /* Take a thread from the run queue, if we have work */
1330   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1331
1332     /* If this TSO has got its outport closed in the meantime, 
1333      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1334      * It has to be marked as TH_DEAD for this purpose.
1335      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1336
1337 JB: TODO: investigate wether state change field could be nuked
1338      entirely and replaced by the normal tso state (whatnext
1339      field). All we want to do is to kill tsos from outside.
1340      */
1341
1342     /* ToDo: write something to the log-file
1343     if (RTSflags.ParFlags.granSimStats && !sameThread)
1344         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1345
1346     CurrentTSO = t;
1347     */
1348     /* the spark pool for the current PE */
1349     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1350
1351     IF_DEBUG(scheduler, 
1352              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1353                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1354
1355     IF_PAR_DEBUG(fish,
1356              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1357                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1358
1359     if (RtsFlags.ParFlags.ParStats.Full && 
1360         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1361         (emitSchedule || // forced emit
1362          (t && LastTSO && t->id != LastTSO->id))) {
1363       /* 
1364          we are running a different TSO, so write a schedule event to log file
1365          NB: If we use fair scheduling we also have to write  a deschedule 
1366              event for LastTSO; with unfair scheduling we know that the
1367              previous tso has blocked whenever we switch to another tso, so
1368              we don't need it in GUM for now
1369       */
1370       IF_PAR_DEBUG(fish, // schedule,
1371                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1372
1373       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1374                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1375       emitSchedule = rtsFalse;
1376     }
1377 }     
1378 #endif
1379
1380 /* ----------------------------------------------------------------------------
1381  * After running a thread...
1382  * ------------------------------------------------------------------------- */
1383
1384 static void
1385 schedulePostRunThread(void)
1386 {
1387 #if defined(PAR)
1388     /* HACK 675: if the last thread didn't yield, make sure to print a 
1389        SCHEDULE event to the log file when StgRunning the next thread, even
1390        if it is the same one as before */
1391     LastTSO = t; 
1392     TimeOfLastYield = CURRENT_TIME;
1393 #endif
1394
1395   /* some statistics gathering in the parallel case */
1396
1397 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1398   switch (ret) {
1399     case HeapOverflow:
1400 # if defined(GRAN)
1401       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1402       globalGranStats.tot_heapover++;
1403 # elif defined(PAR)
1404       globalParStats.tot_heapover++;
1405 # endif
1406       break;
1407
1408      case StackOverflow:
1409 # if defined(GRAN)
1410       IF_DEBUG(gran, 
1411                DumpGranEvent(GR_DESCHEDULE, t));
1412       globalGranStats.tot_stackover++;
1413 # elif defined(PAR)
1414       // IF_DEBUG(par, 
1415       // DumpGranEvent(GR_DESCHEDULE, t);
1416       globalParStats.tot_stackover++;
1417 # endif
1418       break;
1419
1420     case ThreadYielding:
1421 # if defined(GRAN)
1422       IF_DEBUG(gran, 
1423                DumpGranEvent(GR_DESCHEDULE, t));
1424       globalGranStats.tot_yields++;
1425 # elif defined(PAR)
1426       // IF_DEBUG(par, 
1427       // DumpGranEvent(GR_DESCHEDULE, t);
1428       globalParStats.tot_yields++;
1429 # endif
1430       break; 
1431
1432     case ThreadBlocked:
1433 # if defined(GRAN)
1434       IF_DEBUG(scheduler,
1435                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1436                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1437                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1438                if (t->block_info.closure!=(StgClosure*)NULL)
1439                  print_bq(t->block_info.closure);
1440                debugBelch("\n"));
1441
1442       // ??? needed; should emit block before
1443       IF_DEBUG(gran, 
1444                DumpGranEvent(GR_DESCHEDULE, t)); 
1445       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1446       /*
1447         ngoq Dogh!
1448       ASSERT(procStatus[CurrentProc]==Busy || 
1449               ((procStatus[CurrentProc]==Fetching) && 
1450               (t->block_info.closure!=(StgClosure*)NULL)));
1451       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1452           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1453             procStatus[CurrentProc]==Fetching)) 
1454         procStatus[CurrentProc] = Idle;
1455       */
1456 # elif defined(PAR)
1457 //++PAR++  blockThread() writes the event (change?)
1458 # endif
1459     break;
1460
1461   case ThreadFinished:
1462     break;
1463
1464   default:
1465     barf("parGlobalStats: unknown return code");
1466     break;
1467     }
1468 #endif
1469 }
1470
1471 /* -----------------------------------------------------------------------------
1472  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1473  * -------------------------------------------------------------------------- */
1474
1475 static rtsBool
1476 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1477 {
1478     // did the task ask for a large block?
1479     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1480         // if so, get one and push it on the front of the nursery.
1481         bdescr *bd;
1482         lnat blocks;
1483         
1484         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1485         
1486         IF_DEBUG(scheduler,
1487                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1488                             (long)t->id, whatNext_strs[t->what_next], blocks));
1489         
1490         // don't do this if the nursery is (nearly) full, we'll GC first.
1491         if (cap->r.rCurrentNursery->link != NULL ||
1492             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1493                                                // if the nursery has only one block.
1494             
1495             ACQUIRE_SM_LOCK
1496             bd = allocGroup( blocks );
1497             RELEASE_SM_LOCK
1498             cap->r.rNursery->n_blocks += blocks;
1499             
1500             // link the new group into the list
1501             bd->link = cap->r.rCurrentNursery;
1502             bd->u.back = cap->r.rCurrentNursery->u.back;
1503             if (cap->r.rCurrentNursery->u.back != NULL) {
1504                 cap->r.rCurrentNursery->u.back->link = bd;
1505             } else {
1506 #if !defined(SMP)
1507                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1508                        g0s0 == cap->r.rNursery);
1509 #endif
1510                 cap->r.rNursery->blocks = bd;
1511             }             
1512             cap->r.rCurrentNursery->u.back = bd;
1513             
1514             // initialise it as a nursery block.  We initialise the
1515             // step, gen_no, and flags field of *every* sub-block in
1516             // this large block, because this is easier than making
1517             // sure that we always find the block head of a large
1518             // block whenever we call Bdescr() (eg. evacuate() and
1519             // isAlive() in the GC would both have to do this, at
1520             // least).
1521             { 
1522                 bdescr *x;
1523                 for (x = bd; x < bd + blocks; x++) {
1524                     x->step = cap->r.rNursery;
1525                     x->gen_no = 0;
1526                     x->flags = 0;
1527                 }
1528             }
1529             
1530             // This assert can be a killer if the app is doing lots
1531             // of large block allocations.
1532             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1533             
1534             // now update the nursery to point to the new block
1535             cap->r.rCurrentNursery = bd;
1536             
1537             // we might be unlucky and have another thread get on the
1538             // run queue before us and steal the large block, but in that
1539             // case the thread will just end up requesting another large
1540             // block.
1541             pushOnRunQueue(cap,t);
1542             return rtsFalse;  /* not actually GC'ing */
1543         }
1544     }
1545     
1546     IF_DEBUG(scheduler,
1547              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1548                         (long)t->id, whatNext_strs[t->what_next]));
1549 #if defined(GRAN)
1550     ASSERT(!is_on_queue(t,CurrentProc));
1551 #elif defined(PARALLEL_HASKELL)
1552     /* Currently we emit a DESCHEDULE event before GC in GUM.
1553        ToDo: either add separate event to distinguish SYSTEM time from rest
1554        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1555     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1556         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1557                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1558         emitSchedule = rtsTrue;
1559     }
1560 #endif
1561       
1562     pushOnRunQueue(cap,t);
1563     return rtsTrue;
1564     /* actual GC is done at the end of the while loop in schedule() */
1565 }
1566
1567 /* -----------------------------------------------------------------------------
1568  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1569  * -------------------------------------------------------------------------- */
1570
1571 static void
1572 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1573 {
1574     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1575                                   (long)t->id, whatNext_strs[t->what_next]));
1576     /* just adjust the stack for this thread, then pop it back
1577      * on the run queue.
1578      */
1579     { 
1580         /* enlarge the stack */
1581         StgTSO *new_t = threadStackOverflow(cap, t);
1582         
1583         /* The TSO attached to this Task may have moved, so update the
1584          * pointer to it.
1585          */
1586         if (task->tso == t) {
1587             task->tso = new_t;
1588         }
1589         pushOnRunQueue(cap,new_t);
1590     }
1591 }
1592
1593 /* -----------------------------------------------------------------------------
1594  * Handle a thread that returned to the scheduler with ThreadYielding
1595  * -------------------------------------------------------------------------- */
1596
1597 static rtsBool
1598 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1599 {
1600     // Reset the context switch flag.  We don't do this just before
1601     // running the thread, because that would mean we would lose ticks
1602     // during GC, which can lead to unfair scheduling (a thread hogs
1603     // the CPU because the tick always arrives during GC).  This way
1604     // penalises threads that do a lot of allocation, but that seems
1605     // better than the alternative.
1606     context_switch = 0;
1607     
1608     /* put the thread back on the run queue.  Then, if we're ready to
1609      * GC, check whether this is the last task to stop.  If so, wake
1610      * up the GC thread.  getThread will block during a GC until the
1611      * GC is finished.
1612      */
1613     IF_DEBUG(scheduler,
1614              if (t->what_next != prev_what_next) {
1615                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1616                             (long)t->id, whatNext_strs[t->what_next]);
1617              } else {
1618                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1619                             (long)t->id, whatNext_strs[t->what_next]);
1620              }
1621         );
1622     
1623     IF_DEBUG(sanity,
1624              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1625              checkTSO(t));
1626     ASSERT(t->link == END_TSO_QUEUE);
1627     
1628     // Shortcut if we're just switching evaluators: don't bother
1629     // doing stack squeezing (which can be expensive), just run the
1630     // thread.
1631     if (t->what_next != prev_what_next) {
1632         return rtsTrue;
1633     }
1634     
1635 #if defined(GRAN)
1636     ASSERT(!is_on_queue(t,CurrentProc));
1637       
1638     IF_DEBUG(sanity,
1639              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1640              checkThreadQsSanity(rtsTrue));
1641
1642 #endif
1643
1644     addToRunQueue(cap,t);
1645
1646 #if defined(GRAN)
1647     /* add a ContinueThread event to actually process the thread */
1648     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1649               ContinueThread,
1650               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1651     IF_GRAN_DEBUG(bq, 
1652                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1653                   G_EVENTQ(0);
1654                   G_CURR_THREADQ(0));
1655 #endif
1656     return rtsFalse;
1657 }
1658
1659 /* -----------------------------------------------------------------------------
1660  * Handle a thread that returned to the scheduler with ThreadBlocked
1661  * -------------------------------------------------------------------------- */
1662
1663 static void
1664 scheduleHandleThreadBlocked( StgTSO *t
1665 #if !defined(GRAN) && !defined(DEBUG)
1666     STG_UNUSED
1667 #endif
1668     )
1669 {
1670 #if defined(GRAN)
1671     IF_DEBUG(scheduler,
1672              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1673                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1674              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1675     
1676     // ??? needed; should emit block before
1677     IF_DEBUG(gran, 
1678              DumpGranEvent(GR_DESCHEDULE, t)); 
1679     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1680     /*
1681       ngoq Dogh!
1682       ASSERT(procStatus[CurrentProc]==Busy || 
1683       ((procStatus[CurrentProc]==Fetching) && 
1684       (t->block_info.closure!=(StgClosure*)NULL)));
1685       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1686       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1687       procStatus[CurrentProc]==Fetching)) 
1688       procStatus[CurrentProc] = Idle;
1689     */
1690 #elif defined(PAR)
1691     IF_DEBUG(scheduler,
1692              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1693                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1694     IF_PAR_DEBUG(bq,
1695                  
1696                  if (t->block_info.closure!=(StgClosure*)NULL) 
1697                  print_bq(t->block_info.closure));
1698     
1699     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1700     blockThread(t);
1701     
1702     /* whatever we schedule next, we must log that schedule */
1703     emitSchedule = rtsTrue;
1704     
1705 #else /* !GRAN */
1706
1707       // We don't need to do anything.  The thread is blocked, and it
1708       // has tidied up its stack and placed itself on whatever queue
1709       // it needs to be on.
1710
1711 #if !defined(SMP)
1712     ASSERT(t->why_blocked != NotBlocked);
1713              // This might not be true under SMP: we don't have
1714              // exclusive access to this TSO, so someone might have
1715              // woken it up by now.  This actually happens: try
1716              // conc023 +RTS -N2.
1717 #endif
1718
1719     IF_DEBUG(scheduler,
1720              debugBelch("--<< thread %d (%s) stopped: ", 
1721                         t->id, whatNext_strs[t->what_next]);
1722              printThreadBlockage(t);
1723              debugBelch("\n"));
1724     
1725     /* Only for dumping event to log file 
1726        ToDo: do I need this in GranSim, too?
1727        blockThread(t);
1728     */
1729 #endif
1730 }
1731
1732 /* -----------------------------------------------------------------------------
1733  * Handle a thread that returned to the scheduler with ThreadFinished
1734  * -------------------------------------------------------------------------- */
1735
1736 static rtsBool
1737 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1738 {
1739     /* Need to check whether this was a main thread, and if so,
1740      * return with the return value.
1741      *
1742      * We also end up here if the thread kills itself with an
1743      * uncaught exception, see Exception.cmm.
1744      */
1745     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1746                                   t->id, whatNext_strs[t->what_next]));
1747
1748 #if defined(GRAN)
1749       endThread(t, CurrentProc); // clean-up the thread
1750 #elif defined(PARALLEL_HASKELL)
1751       /* For now all are advisory -- HWL */
1752       //if(t->priority==AdvisoryPriority) ??
1753       advisory_thread_count--; // JB: Caution with this counter, buggy!
1754       
1755 # if defined(DIST)
1756       if(t->dist.priority==RevalPriority)
1757         FinishReval(t);
1758 # endif
1759     
1760 # if defined(EDENOLD)
1761       // the thread could still have an outport... (BUG)
1762       if (t->eden.outport != -1) {
1763       // delete the outport for the tso which has finished...
1764         IF_PAR_DEBUG(eden_ports,
1765                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1766                               t->eden.outport, t->id));
1767         deleteOPT(t);
1768       }
1769       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1770       if (t->eden.epid != -1) {
1771         IF_PAR_DEBUG(eden_ports,
1772                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1773                            t->id, t->eden.epid));
1774         removeTSOfromProcess(t);
1775       }
1776 # endif 
1777
1778 # if defined(PAR)
1779       if (RtsFlags.ParFlags.ParStats.Full &&
1780           !RtsFlags.ParFlags.ParStats.Suppressed) 
1781         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1782
1783       //  t->par only contains statistics: left out for now...
1784       IF_PAR_DEBUG(fish,
1785                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1786                               t->id,t,t->par.sparkname));
1787 # endif
1788 #endif // PARALLEL_HASKELL
1789
1790       //
1791       // Check whether the thread that just completed was a bound
1792       // thread, and if so return with the result.  
1793       //
1794       // There is an assumption here that all thread completion goes
1795       // through this point; we need to make sure that if a thread
1796       // ends up in the ThreadKilled state, that it stays on the run
1797       // queue so it can be dealt with here.
1798       //
1799
1800       if (t->bound) {
1801
1802           if (t->bound != task) {
1803 #if !defined(THREADED_RTS)
1804               // Must be a bound thread that is not the topmost one.  Leave
1805               // it on the run queue until the stack has unwound to the
1806               // point where we can deal with this.  Leaving it on the run
1807               // queue also ensures that the garbage collector knows about
1808               // this thread and its return value (it gets dropped from the
1809               // all_threads list so there's no other way to find it).
1810               appendToRunQueue(cap,t);
1811               return rtsFalse;
1812 #else
1813               // this cannot happen in the threaded RTS, because a
1814               // bound thread can only be run by the appropriate Task.
1815               barf("finished bound thread that isn't mine");
1816 #endif
1817           }
1818
1819           ASSERT(task->tso == t);
1820
1821           if (t->what_next == ThreadComplete) {
1822               if (task->ret) {
1823                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1824                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1825               }
1826               task->stat = Success;
1827           } else {
1828               if (task->ret) {
1829                   *(task->ret) = NULL;
1830               }
1831               if (interrupted) {
1832                   task->stat = Interrupted;
1833               } else {
1834                   task->stat = Killed;
1835               }
1836           }
1837 #ifdef DEBUG
1838           removeThreadLabel((StgWord)task->tso->id);
1839 #endif
1840           return rtsTrue; // tells schedule() to return
1841       }
1842
1843       return rtsFalse;
1844 }
1845
1846 /* -----------------------------------------------------------------------------
1847  * Perform a heap census, if PROFILING
1848  * -------------------------------------------------------------------------- */
1849
1850 static rtsBool
1851 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1852 {
1853 #if defined(PROFILING)
1854     // When we have +RTS -i0 and we're heap profiling, do a census at
1855     // every GC.  This lets us get repeatable runs for debugging.
1856     if (performHeapProfile ||
1857         (RtsFlags.ProfFlags.profileInterval==0 &&
1858          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1859         GarbageCollect(GetRoots, rtsTrue);
1860         heapCensus();
1861         performHeapProfile = rtsFalse;
1862         return rtsTrue;  // true <=> we already GC'd
1863     }
1864 #endif
1865     return rtsFalse;
1866 }
1867
1868 /* -----------------------------------------------------------------------------
1869  * Perform a garbage collection if necessary
1870  * -------------------------------------------------------------------------- */
1871
1872 static void
1873 scheduleDoGC( Capability *cap, Task *task USED_IF_SMP, rtsBool force_major )
1874 {
1875     StgTSO *t;
1876 #ifdef SMP
1877     static volatile StgWord waiting_for_gc;
1878     rtsBool was_waiting;
1879     nat i;
1880 #endif
1881
1882 #ifdef SMP
1883     // In order to GC, there must be no threads running Haskell code.
1884     // Therefore, the GC thread needs to hold *all* the capabilities,
1885     // and release them after the GC has completed.  
1886     //
1887     // This seems to be the simplest way: previous attempts involved
1888     // making all the threads with capabilities give up their
1889     // capabilities and sleep except for the *last* one, which
1890     // actually did the GC.  But it's quite hard to arrange for all
1891     // the other tasks to sleep and stay asleep.
1892     //
1893         
1894     was_waiting = cas(&waiting_for_gc, 0, 1);
1895     if (was_waiting) {
1896         do {
1897             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1898             yieldCapability(&cap,task);
1899         } while (waiting_for_gc);
1900         return;
1901     }
1902
1903     for (i=0; i < n_capabilities; i++) {
1904         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1905         if (cap != &capabilities[i]) {
1906             Capability *pcap = &capabilities[i];
1907             // we better hope this task doesn't get migrated to
1908             // another Capability while we're waiting for this one.
1909             // It won't, because load balancing happens while we have
1910             // all the Capabilities, but even so it's a slightly
1911             // unsavoury invariant.
1912             task->cap = pcap;
1913             context_switch = 1;
1914             waitForReturnCapability(&pcap, task);
1915             if (pcap != &capabilities[i]) {
1916                 barf("scheduleDoGC: got the wrong capability");
1917             }
1918         }
1919     }
1920
1921     waiting_for_gc = rtsFalse;
1922 #endif
1923
1924     /* Kick any transactions which are invalid back to their
1925      * atomically frames.  When next scheduled they will try to
1926      * commit, this commit will fail and they will retry.
1927      */
1928     { 
1929         StgTSO *next;
1930
1931         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1932             if (t->what_next == ThreadRelocated) {
1933                 next = t->link;
1934             } else {
1935                 next = t->global_link;
1936                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1937                     if (!stmValidateNestOfTransactions (t -> trec)) {
1938                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1939                         
1940                         // strip the stack back to the
1941                         // ATOMICALLY_FRAME, aborting the (nested)
1942                         // transaction, and saving the stack of any
1943                         // partially-evaluated thunks on the heap.
1944                         raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1945                         
1946 #ifdef REG_R1
1947                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1948 #endif
1949                     }
1950                 }
1951             }
1952         }
1953     }
1954     
1955     // so this happens periodically:
1956     scheduleCheckBlackHoles(cap);
1957     
1958     IF_DEBUG(scheduler, printAllThreads());
1959
1960     /* everybody back, start the GC.
1961      * Could do it in this thread, or signal a condition var
1962      * to do it in another thread.  Either way, we need to
1963      * broadcast on gc_pending_cond afterward.
1964      */
1965 #if defined(THREADED_RTS)
1966     IF_DEBUG(scheduler,sched_belch("doing GC"));
1967 #endif
1968     GarbageCollect(GetRoots, force_major);
1969     
1970 #if defined(SMP)
1971     // release our stash of capabilities.
1972     for (i = 0; i < n_capabilities; i++) {
1973         if (cap != &capabilities[i]) {
1974             task->cap = &capabilities[i];
1975             releaseCapability(&capabilities[i]);
1976         }
1977     }
1978     task->cap = cap;
1979 #endif
1980
1981 #if defined(GRAN)
1982     /* add a ContinueThread event to continue execution of current thread */
1983     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1984               ContinueThread,
1985               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1986     IF_GRAN_DEBUG(bq, 
1987                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1988                   G_EVENTQ(0);
1989                   G_CURR_THREADQ(0));
1990 #endif /* GRAN */
1991 }
1992
1993 /* ---------------------------------------------------------------------------
1994  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1995  * used by Control.Concurrent for error checking.
1996  * ------------------------------------------------------------------------- */
1997  
1998 StgBool
1999 rtsSupportsBoundThreads(void)
2000 {
2001 #if defined(THREADED_RTS)
2002   return rtsTrue;
2003 #else
2004   return rtsFalse;
2005 #endif
2006 }
2007
2008 /* ---------------------------------------------------------------------------
2009  * isThreadBound(tso): check whether tso is bound to an OS thread.
2010  * ------------------------------------------------------------------------- */
2011  
2012 StgBool
2013 isThreadBound(StgTSO* tso USED_IF_THREADS)
2014 {
2015 #if defined(THREADED_RTS)
2016   return (tso->bound != NULL);
2017 #endif
2018   return rtsFalse;
2019 }
2020
2021 /* ---------------------------------------------------------------------------
2022  * Singleton fork(). Do not copy any running threads.
2023  * ------------------------------------------------------------------------- */
2024
2025 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2026 #define FORKPROCESS_PRIMOP_SUPPORTED
2027 #endif
2028
2029 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2030 static void 
2031 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2032 #endif
2033 StgInt
2034 forkProcess(HsStablePtr *entry
2035 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2036             STG_UNUSED
2037 #endif
2038            )
2039 {
2040 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2041     Task *task;
2042     pid_t pid;
2043     StgTSO* t,*next;
2044     Capability *cap;
2045     
2046     IF_DEBUG(scheduler,sched_belch("forking!"));
2047     
2048     // ToDo: for SMP, we should probably acquire *all* the capabilities
2049     cap = rts_lock();
2050     
2051     pid = fork();
2052     
2053     if (pid) { // parent
2054         
2055         // just return the pid
2056         rts_unlock(cap);
2057         return pid;
2058         
2059     } else { // child
2060         
2061         // delete all threads
2062         cap->run_queue_hd = END_TSO_QUEUE;
2063         cap->run_queue_tl = END_TSO_QUEUE;
2064         
2065         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2066             next = t->link;
2067             
2068             // don't allow threads to catch the ThreadKilled exception
2069             deleteThreadImmediately(cap,t);
2070         }
2071         
2072         // wipe the task list
2073         ACQUIRE_LOCK(&sched_mutex);
2074         for (task = all_tasks; task != NULL; task=task->all_link) {
2075             if (task != cap->running_task) discardTask(task);
2076         }
2077         RELEASE_LOCK(&sched_mutex);
2078
2079         cap->suspended_ccalling_tasks = NULL;
2080
2081 #if defined(THREADED_RTS)
2082         // wipe our spare workers list.
2083         cap->spare_workers = NULL;
2084         cap->returning_tasks_hd = NULL;
2085         cap->returning_tasks_tl = NULL;
2086 #endif
2087
2088         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2089         rts_checkSchedStatus("forkProcess",cap);
2090         
2091         rts_unlock(cap);
2092         hs_exit();                      // clean up and exit
2093         stg_exit(EXIT_SUCCESS);
2094     }
2095 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2096     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2097     return -1;
2098 #endif
2099 }
2100
2101 /* ---------------------------------------------------------------------------
2102  * Delete the threads on the run queue of the current capability.
2103  * ------------------------------------------------------------------------- */
2104    
2105 static void
2106 deleteRunQueue (Capability *cap)
2107 {
2108     StgTSO *t, *next;
2109     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2110         ASSERT(t->what_next != ThreadRelocated);
2111         next = t->link;
2112         deleteThread(cap, t);
2113     }
2114 }
2115
2116 /* startThread and  insertThread are now in GranSim.c -- HWL */
2117
2118
2119 /* -----------------------------------------------------------------------------
2120    Managing the suspended_ccalling_tasks list.
2121    Locks required: sched_mutex
2122    -------------------------------------------------------------------------- */
2123
2124 STATIC_INLINE void
2125 suspendTask (Capability *cap, Task *task)
2126 {
2127     ASSERT(task->next == NULL && task->prev == NULL);
2128     task->next = cap->suspended_ccalling_tasks;
2129     task->prev = NULL;
2130     if (cap->suspended_ccalling_tasks) {
2131         cap->suspended_ccalling_tasks->prev = task;
2132     }
2133     cap->suspended_ccalling_tasks = task;
2134 }
2135
2136 STATIC_INLINE void
2137 recoverSuspendedTask (Capability *cap, Task *task)
2138 {
2139     if (task->prev) {
2140         task->prev->next = task->next;
2141     } else {
2142         ASSERT(cap->suspended_ccalling_tasks == task);
2143         cap->suspended_ccalling_tasks = task->next;
2144     }
2145     if (task->next) {
2146         task->next->prev = task->prev;
2147     }
2148     task->next = task->prev = NULL;
2149 }
2150
2151 /* ---------------------------------------------------------------------------
2152  * Suspending & resuming Haskell threads.
2153  * 
2154  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2155  * its capability before calling the C function.  This allows another
2156  * task to pick up the capability and carry on running Haskell
2157  * threads.  It also means that if the C call blocks, it won't lock
2158  * the whole system.
2159  *
2160  * The Haskell thread making the C call is put to sleep for the
2161  * duration of the call, on the susepended_ccalling_threads queue.  We
2162  * give out a token to the task, which it can use to resume the thread
2163  * on return from the C function.
2164  * ------------------------------------------------------------------------- */
2165    
2166 void *
2167 suspendThread (StgRegTable *reg)
2168 {
2169   Capability *cap;
2170   int saved_errno = errno;
2171   StgTSO *tso;
2172   Task *task;
2173
2174   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2175    */
2176   cap = regTableToCapability(reg);
2177
2178   task = cap->running_task;
2179   tso = cap->r.rCurrentTSO;
2180
2181   IF_DEBUG(scheduler,
2182            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2183
2184   // XXX this might not be necessary --SDM
2185   tso->what_next = ThreadRunGHC;
2186
2187   threadPaused(cap,tso);
2188
2189   if(tso->blocked_exceptions == NULL)  {
2190       tso->why_blocked = BlockedOnCCall;
2191       tso->blocked_exceptions = END_TSO_QUEUE;
2192   } else {
2193       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2194   }
2195
2196   // Hand back capability
2197   task->suspended_tso = tso;
2198
2199   ACQUIRE_LOCK(&cap->lock);
2200
2201   suspendTask(cap,task);
2202   cap->in_haskell = rtsFalse;
2203   releaseCapability_(cap);
2204   
2205   RELEASE_LOCK(&cap->lock);
2206
2207 #if defined(THREADED_RTS)
2208   /* Preparing to leave the RTS, so ensure there's a native thread/task
2209      waiting to take over.
2210   */
2211   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2212 #endif
2213
2214   errno = saved_errno;
2215   return task;
2216 }
2217
2218 StgRegTable *
2219 resumeThread (void *task_)
2220 {
2221     StgTSO *tso;
2222     Capability *cap;
2223     int saved_errno = errno;
2224     Task *task = task_;
2225
2226     cap = task->cap;
2227     // Wait for permission to re-enter the RTS with the result.
2228     waitForReturnCapability(&cap,task);
2229     // we might be on a different capability now... but if so, our
2230     // entry on the suspended_ccalling_tasks list will also have been
2231     // migrated.
2232
2233     // Remove the thread from the suspended list
2234     recoverSuspendedTask(cap,task);
2235
2236     tso = task->suspended_tso;
2237     task->suspended_tso = NULL;
2238     tso->link = END_TSO_QUEUE;
2239     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2240     
2241     if (tso->why_blocked == BlockedOnCCall) {
2242         awakenBlockedQueue(cap,tso->blocked_exceptions);
2243         tso->blocked_exceptions = NULL;
2244     }
2245     
2246     /* Reset blocking status */
2247     tso->why_blocked  = NotBlocked;
2248     
2249     cap->r.rCurrentTSO = tso;
2250     cap->in_haskell = rtsTrue;
2251     errno = saved_errno;
2252
2253     /* We might have GC'd, mark the TSO dirty again */
2254     dirtyTSO(tso);
2255
2256     return &cap->r;
2257 }
2258
2259 /* ---------------------------------------------------------------------------
2260  * Comparing Thread ids.
2261  *
2262  * This is used from STG land in the implementation of the
2263  * instances of Eq/Ord for ThreadIds.
2264  * ------------------------------------------------------------------------ */
2265
2266 int
2267 cmp_thread(StgPtr tso1, StgPtr tso2) 
2268
2269   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2270   StgThreadID id2 = ((StgTSO *)tso2)->id;
2271  
2272   if (id1 < id2) return (-1);
2273   if (id1 > id2) return 1;
2274   return 0;
2275 }
2276
2277 /* ---------------------------------------------------------------------------
2278  * Fetching the ThreadID from an StgTSO.
2279  *
2280  * This is used in the implementation of Show for ThreadIds.
2281  * ------------------------------------------------------------------------ */
2282 int
2283 rts_getThreadId(StgPtr tso) 
2284 {
2285   return ((StgTSO *)tso)->id;
2286 }
2287
2288 #ifdef DEBUG
2289 void
2290 labelThread(StgPtr tso, char *label)
2291 {
2292   int len;
2293   void *buf;
2294
2295   /* Caveat: Once set, you can only set the thread name to "" */
2296   len = strlen(label)+1;
2297   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2298   strncpy(buf,label,len);
2299   /* Update will free the old memory for us */
2300   updateThreadLabel(((StgTSO *)tso)->id,buf);
2301 }
2302 #endif /* DEBUG */
2303
2304 /* ---------------------------------------------------------------------------
2305    Create a new thread.
2306
2307    The new thread starts with the given stack size.  Before the
2308    scheduler can run, however, this thread needs to have a closure
2309    (and possibly some arguments) pushed on its stack.  See
2310    pushClosure() in Schedule.h.
2311
2312    createGenThread() and createIOThread() (in SchedAPI.h) are
2313    convenient packaged versions of this function.
2314
2315    currently pri (priority) is only used in a GRAN setup -- HWL
2316    ------------------------------------------------------------------------ */
2317 #if defined(GRAN)
2318 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2319 StgTSO *
2320 createThread(nat size, StgInt pri)
2321 #else
2322 StgTSO *
2323 createThread(Capability *cap, nat size)
2324 #endif
2325 {
2326     StgTSO *tso;
2327     nat stack_size;
2328
2329     /* sched_mutex is *not* required */
2330
2331     /* First check whether we should create a thread at all */
2332 #if defined(PARALLEL_HASKELL)
2333     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2334     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2335         threadsIgnored++;
2336         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2337                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2338         return END_TSO_QUEUE;
2339     }
2340     threadsCreated++;
2341 #endif
2342
2343 #if defined(GRAN)
2344     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2345 #endif
2346
2347     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2348
2349     /* catch ridiculously small stack sizes */
2350     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2351         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2352     }
2353
2354     stack_size = size - TSO_STRUCT_SIZEW;
2355     
2356     tso = (StgTSO *)allocateLocal(cap, size);
2357     TICK_ALLOC_TSO(stack_size, 0);
2358
2359     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2360 #if defined(GRAN)
2361     SET_GRAN_HDR(tso, ThisPE);
2362 #endif
2363
2364     // Always start with the compiled code evaluator
2365     tso->what_next = ThreadRunGHC;
2366
2367     tso->why_blocked  = NotBlocked;
2368     tso->blocked_exceptions = NULL;
2369     tso->flags = TSO_DIRTY;
2370     
2371     tso->saved_errno = 0;
2372     tso->bound = NULL;
2373     
2374     tso->stack_size     = stack_size;
2375     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2376                           - TSO_STRUCT_SIZEW;
2377     tso->sp             = (P_)&(tso->stack) + stack_size;
2378
2379     tso->trec = NO_TREC;
2380     
2381 #ifdef PROFILING
2382     tso->prof.CCCS = CCS_MAIN;
2383 #endif
2384     
2385   /* put a stop frame on the stack */
2386     tso->sp -= sizeofW(StgStopFrame);
2387     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2388     tso->link = END_TSO_QUEUE;
2389     
2390   // ToDo: check this
2391 #if defined(GRAN)
2392     /* uses more flexible routine in GranSim */
2393     insertThread(tso, CurrentProc);
2394 #else
2395     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2396      * from its creation
2397      */
2398 #endif
2399     
2400 #if defined(GRAN) 
2401     if (RtsFlags.GranFlags.GranSimStats.Full) 
2402         DumpGranEvent(GR_START,tso);
2403 #elif defined(PARALLEL_HASKELL)
2404     if (RtsFlags.ParFlags.ParStats.Full) 
2405         DumpGranEvent(GR_STARTQ,tso);
2406     /* HACk to avoid SCHEDULE 
2407        LastTSO = tso; */
2408 #endif
2409     
2410     /* Link the new thread on the global thread list.
2411      */
2412     ACQUIRE_LOCK(&sched_mutex);
2413     tso->id = next_thread_id++;  // while we have the mutex
2414     tso->global_link = all_threads;
2415     all_threads = tso;
2416     RELEASE_LOCK(&sched_mutex);
2417     
2418 #if defined(DIST)
2419     tso->dist.priority = MandatoryPriority; //by default that is...
2420 #endif
2421     
2422 #if defined(GRAN)
2423     tso->gran.pri = pri;
2424 # if defined(DEBUG)
2425     tso->gran.magic = TSO_MAGIC; // debugging only
2426 # endif
2427     tso->gran.sparkname   = 0;
2428     tso->gran.startedat   = CURRENT_TIME; 
2429     tso->gran.exported    = 0;
2430     tso->gran.basicblocks = 0;
2431     tso->gran.allocs      = 0;
2432     tso->gran.exectime    = 0;
2433     tso->gran.fetchtime   = 0;
2434     tso->gran.fetchcount  = 0;
2435     tso->gran.blocktime   = 0;
2436     tso->gran.blockcount  = 0;
2437     tso->gran.blockedat   = 0;
2438     tso->gran.globalsparks = 0;
2439     tso->gran.localsparks  = 0;
2440     if (RtsFlags.GranFlags.Light)
2441         tso->gran.clock  = Now; /* local clock */
2442     else
2443         tso->gran.clock  = 0;
2444     
2445     IF_DEBUG(gran,printTSO(tso));
2446 #elif defined(PARALLEL_HASKELL)
2447 # if defined(DEBUG)
2448     tso->par.magic = TSO_MAGIC; // debugging only
2449 # endif
2450     tso->par.sparkname   = 0;
2451     tso->par.startedat   = CURRENT_TIME; 
2452     tso->par.exported    = 0;
2453     tso->par.basicblocks = 0;
2454     tso->par.allocs      = 0;
2455     tso->par.exectime    = 0;
2456     tso->par.fetchtime   = 0;
2457     tso->par.fetchcount  = 0;
2458     tso->par.blocktime   = 0;
2459     tso->par.blockcount  = 0;
2460     tso->par.blockedat   = 0;
2461     tso->par.globalsparks = 0;
2462     tso->par.localsparks  = 0;
2463 #endif
2464     
2465 #if defined(GRAN)
2466     globalGranStats.tot_threads_created++;
2467     globalGranStats.threads_created_on_PE[CurrentProc]++;
2468     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2469     globalGranStats.tot_sq_probes++;
2470 #elif defined(PARALLEL_HASKELL)
2471     // collect parallel global statistics (currently done together with GC stats)
2472     if (RtsFlags.ParFlags.ParStats.Global &&
2473         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2474         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2475         globalParStats.tot_threads_created++;
2476     }
2477 #endif 
2478     
2479 #if defined(GRAN)
2480     IF_GRAN_DEBUG(pri,
2481                   sched_belch("==__ schedule: Created TSO %d (%p);",
2482                               CurrentProc, tso, tso->id));
2483 #elif defined(PARALLEL_HASKELL)
2484     IF_PAR_DEBUG(verbose,
2485                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2486                              (long)tso->id, tso, advisory_thread_count));
2487 #else
2488     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2489                                    (long)tso->id, (long)tso->stack_size));
2490 #endif    
2491     return tso;
2492 }
2493
2494 #if defined(PAR)
2495 /* RFP:
2496    all parallel thread creation calls should fall through the following routine.
2497 */
2498 StgTSO *
2499 createThreadFromSpark(rtsSpark spark) 
2500 { StgTSO *tso;
2501   ASSERT(spark != (rtsSpark)NULL);
2502 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2503   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2504   { threadsIgnored++;
2505     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2506           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2507     return END_TSO_QUEUE;
2508   }
2509   else
2510   { threadsCreated++;
2511     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2512     if (tso==END_TSO_QUEUE)     
2513       barf("createSparkThread: Cannot create TSO");
2514 #if defined(DIST)
2515     tso->priority = AdvisoryPriority;
2516 #endif
2517     pushClosure(tso,spark);
2518     addToRunQueue(tso);
2519     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2520   }
2521   return tso;
2522 }
2523 #endif
2524
2525 /*
2526   Turn a spark into a thread.
2527   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2528 */
2529 #if 0
2530 StgTSO *
2531 activateSpark (rtsSpark spark) 
2532 {
2533   StgTSO *tso;
2534
2535   tso = createSparkThread(spark);
2536   if (RtsFlags.ParFlags.ParStats.Full) {   
2537     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2538       IF_PAR_DEBUG(verbose,
2539                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2540                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2541   }
2542   // ToDo: fwd info on local/global spark to thread -- HWL
2543   // tso->gran.exported =  spark->exported;
2544   // tso->gran.locked =   !spark->global;
2545   // tso->gran.sparkname = spark->name;
2546
2547   return tso;
2548 }
2549 #endif
2550
2551 /* ---------------------------------------------------------------------------
2552  * scheduleThread()
2553  *
2554  * scheduleThread puts a thread on the end  of the runnable queue.
2555  * This will usually be done immediately after a thread is created.
2556  * The caller of scheduleThread must create the thread using e.g.
2557  * createThread and push an appropriate closure
2558  * on this thread's stack before the scheduler is invoked.
2559  * ------------------------------------------------------------------------ */
2560
2561 void
2562 scheduleThread(Capability *cap, StgTSO *tso)
2563 {
2564     // The thread goes at the *end* of the run-queue, to avoid possible
2565     // starvation of any threads already on the queue.
2566     appendToRunQueue(cap,tso);
2567 }
2568
2569 Capability *
2570 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2571 {
2572     Task *task;
2573
2574     // We already created/initialised the Task
2575     task = cap->running_task;
2576
2577     // This TSO is now a bound thread; make the Task and TSO
2578     // point to each other.
2579     tso->bound = task;
2580
2581     task->tso = tso;
2582     task->ret = ret;
2583     task->stat = NoStatus;
2584
2585     appendToRunQueue(cap,tso);
2586
2587     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2588
2589 #if defined(GRAN)
2590     /* GranSim specific init */
2591     CurrentTSO = m->tso;                // the TSO to run
2592     procStatus[MainProc] = Busy;        // status of main PE
2593     CurrentProc = MainProc;             // PE to run it on
2594 #endif
2595
2596     cap = schedule(cap,task);
2597
2598     ASSERT(task->stat != NoStatus);
2599     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2600
2601     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2602     return cap;
2603 }
2604
2605 /* ----------------------------------------------------------------------------
2606  * Starting Tasks
2607  * ------------------------------------------------------------------------- */
2608
2609 #if defined(THREADED_RTS)
2610 void
2611 workerStart(Task *task)
2612 {
2613     Capability *cap;
2614
2615     // See startWorkerTask().
2616     ACQUIRE_LOCK(&task->lock);
2617     cap = task->cap;
2618     RELEASE_LOCK(&task->lock);
2619
2620     // set the thread-local pointer to the Task:
2621     taskEnter(task);
2622
2623     // schedule() runs without a lock.
2624     cap = schedule(cap,task);
2625
2626     // On exit from schedule(), we have a Capability.
2627     releaseCapability(cap);
2628     taskStop(task);
2629 }
2630 #endif
2631
2632 /* ---------------------------------------------------------------------------
2633  * initScheduler()
2634  *
2635  * Initialise the scheduler.  This resets all the queues - if the
2636  * queues contained any threads, they'll be garbage collected at the
2637  * next pass.
2638  *
2639  * ------------------------------------------------------------------------ */
2640
2641 void 
2642 initScheduler(void)
2643 {
2644 #if defined(GRAN)
2645   nat i;
2646   for (i=0; i<=MAX_PROC; i++) {
2647     run_queue_hds[i]      = END_TSO_QUEUE;
2648     run_queue_tls[i]      = END_TSO_QUEUE;
2649     blocked_queue_hds[i]  = END_TSO_QUEUE;
2650     blocked_queue_tls[i]  = END_TSO_QUEUE;
2651     ccalling_threadss[i]  = END_TSO_QUEUE;
2652     blackhole_queue[i]    = END_TSO_QUEUE;
2653     sleeping_queue        = END_TSO_QUEUE;
2654   }
2655 #elif !defined(THREADED_RTS)
2656   blocked_queue_hd  = END_TSO_QUEUE;
2657   blocked_queue_tl  = END_TSO_QUEUE;
2658   sleeping_queue    = END_TSO_QUEUE;
2659 #endif
2660
2661   blackhole_queue   = END_TSO_QUEUE;
2662   all_threads       = END_TSO_QUEUE;
2663
2664   context_switch = 0;
2665   interrupted    = 0;
2666
2667   RtsFlags.ConcFlags.ctxtSwitchTicks =
2668       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2669       
2670 #if defined(THREADED_RTS)
2671   /* Initialise the mutex and condition variables used by
2672    * the scheduler. */
2673   initMutex(&sched_mutex);
2674 #endif
2675   
2676   ACQUIRE_LOCK(&sched_mutex);
2677
2678   /* A capability holds the state a native thread needs in
2679    * order to execute STG code. At least one capability is
2680    * floating around (only SMP builds have more than one).
2681    */
2682   initCapabilities();
2683
2684   initTaskManager();
2685
2686 #if defined(SMP) || defined(PARALLEL_HASKELL)
2687   initSparkPools();
2688 #endif
2689
2690 #if defined(SMP)
2691   /*
2692    * Eagerly start one worker to run each Capability, except for
2693    * Capability 0.  The idea is that we're probably going to start a
2694    * bound thread on Capability 0 pretty soon, so we don't want a
2695    * worker task hogging it.
2696    */
2697   { 
2698       nat i;
2699       Capability *cap;
2700       for (i = 1; i < n_capabilities; i++) {
2701           cap = &capabilities[i];
2702           ACQUIRE_LOCK(&cap->lock);
2703           startWorkerTask(cap, workerStart);
2704           RELEASE_LOCK(&cap->lock);
2705       }
2706   }
2707 #endif
2708
2709   RELEASE_LOCK(&sched_mutex);
2710 }
2711
2712 void
2713 exitScheduler( void )
2714 {
2715     interrupted = rtsTrue;
2716     shutting_down_scheduler = rtsTrue;
2717
2718 #if defined(THREADED_RTS)
2719     { 
2720         Task *task;
2721         nat i;
2722         
2723         ACQUIRE_LOCK(&sched_mutex);
2724         task = newBoundTask();
2725         RELEASE_LOCK(&sched_mutex);
2726
2727         for (i = 0; i < n_capabilities; i++) {
2728             shutdownCapability(&capabilities[i], task);
2729         }
2730         boundTaskExiting(task);
2731         stopTaskManager();
2732     }
2733 #endif
2734 }
2735
2736 /* ---------------------------------------------------------------------------
2737    Where are the roots that we know about?
2738
2739         - all the threads on the runnable queue
2740         - all the threads on the blocked queue
2741         - all the threads on the sleeping queue
2742         - all the thread currently executing a _ccall_GC
2743         - all the "main threads"
2744      
2745    ------------------------------------------------------------------------ */
2746
2747 /* This has to be protected either by the scheduler monitor, or by the
2748         garbage collection monitor (probably the latter).
2749         KH @ 25/10/99
2750 */
2751
2752 void
2753 GetRoots( evac_fn evac )
2754 {
2755     nat i;
2756     Capability *cap;
2757     Task *task;
2758
2759 #if defined(GRAN)
2760     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2761         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2762             evac((StgClosure **)&run_queue_hds[i]);
2763         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2764             evac((StgClosure **)&run_queue_tls[i]);
2765         
2766         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2767             evac((StgClosure **)&blocked_queue_hds[i]);
2768         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2769             evac((StgClosure **)&blocked_queue_tls[i]);
2770         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2771             evac((StgClosure **)&ccalling_threads[i]);
2772     }
2773
2774     markEventQueue();
2775
2776 #else /* !GRAN */
2777
2778     for (i = 0; i < n_capabilities; i++) {
2779         cap = &capabilities[i];
2780         evac((StgClosure **)&cap->run_queue_hd);
2781         evac((StgClosure **)&cap->run_queue_tl);
2782         
2783         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2784              task=task->next) {
2785             evac((StgClosure **)&task->suspended_tso);
2786         }
2787     }
2788     
2789 #if !defined(THREADED_RTS)
2790     evac((StgClosure **)&blocked_queue_hd);
2791     evac((StgClosure **)&blocked_queue_tl);
2792     evac((StgClosure **)&sleeping_queue);
2793 #endif 
2794 #endif
2795
2796     evac((StgClosure **)&blackhole_queue);
2797
2798 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2799     markSparkQueue(evac);
2800 #endif
2801     
2802 #if defined(RTS_USER_SIGNALS)
2803     // mark the signal handlers (signals should be already blocked)
2804     markSignalHandlers(evac);
2805 #endif
2806 }
2807
2808 /* -----------------------------------------------------------------------------
2809    performGC
2810
2811    This is the interface to the garbage collector from Haskell land.
2812    We provide this so that external C code can allocate and garbage
2813    collect when called from Haskell via _ccall_GC.
2814
2815    It might be useful to provide an interface whereby the programmer
2816    can specify more roots (ToDo).
2817    
2818    This needs to be protected by the GC condition variable above.  KH.
2819    -------------------------------------------------------------------------- */
2820
2821 static void (*extra_roots)(evac_fn);
2822
2823 void
2824 performGC(void)
2825 {
2826 #ifdef THREADED_RTS
2827     // ToDo: we have to grab all the capabilities here.
2828     errorBelch("performGC not supported in threaded RTS (yet)");
2829     stg_exit(EXIT_FAILURE);
2830 #endif
2831     /* Obligated to hold this lock upon entry */
2832     GarbageCollect(GetRoots,rtsFalse);
2833 }
2834
2835 void
2836 performMajorGC(void)
2837 {
2838 #ifdef THREADED_RTS
2839     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2840     stg_exit(EXIT_FAILURE);
2841 #endif
2842     GarbageCollect(GetRoots,rtsTrue);
2843 }
2844
2845 static void
2846 AllRoots(evac_fn evac)
2847 {
2848     GetRoots(evac);             // the scheduler's roots
2849     extra_roots(evac);          // the user's roots
2850 }
2851
2852 void
2853 performGCWithRoots(void (*get_roots)(evac_fn))
2854 {
2855 #ifdef THREADED_RTS
2856     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2857     stg_exit(EXIT_FAILURE);
2858 #endif
2859     extra_roots = get_roots;
2860     GarbageCollect(AllRoots,rtsFalse);
2861 }
2862
2863 /* -----------------------------------------------------------------------------
2864    Stack overflow
2865
2866    If the thread has reached its maximum stack size, then raise the
2867    StackOverflow exception in the offending thread.  Otherwise
2868    relocate the TSO into a larger chunk of memory and adjust its stack
2869    size appropriately.
2870    -------------------------------------------------------------------------- */
2871
2872 static StgTSO *
2873 threadStackOverflow(Capability *cap, StgTSO *tso)
2874 {
2875   nat new_stack_size, stack_words;
2876   lnat new_tso_size;
2877   StgPtr new_sp;
2878   StgTSO *dest;
2879
2880   IF_DEBUG(sanity,checkTSO(tso));
2881   if (tso->stack_size >= tso->max_stack_size) {
2882
2883     IF_DEBUG(gc,
2884              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2885                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2886              /* If we're debugging, just print out the top of the stack */
2887              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2888                                               tso->sp+64)));
2889
2890     /* Send this thread the StackOverflow exception */
2891     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2892     return tso;
2893   }
2894
2895   /* Try to double the current stack size.  If that takes us over the
2896    * maximum stack size for this thread, then use the maximum instead.
2897    * Finally round up so the TSO ends up as a whole number of blocks.
2898    */
2899   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2900   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2901                                        TSO_STRUCT_SIZE)/sizeof(W_);
2902   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2903   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2904
2905   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2906
2907   dest = (StgTSO *)allocate(new_tso_size);
2908   TICK_ALLOC_TSO(new_stack_size,0);
2909
2910   /* copy the TSO block and the old stack into the new area */
2911   memcpy(dest,tso,TSO_STRUCT_SIZE);
2912   stack_words = tso->stack + tso->stack_size - tso->sp;
2913   new_sp = (P_)dest + new_tso_size - stack_words;
2914   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2915
2916   /* relocate the stack pointers... */
2917   dest->sp         = new_sp;
2918   dest->stack_size = new_stack_size;
2919         
2920   /* Mark the old TSO as relocated.  We have to check for relocated
2921    * TSOs in the garbage collector and any primops that deal with TSOs.
2922    *
2923    * It's important to set the sp value to just beyond the end
2924    * of the stack, so we don't attempt to scavenge any part of the
2925    * dead TSO's stack.
2926    */
2927   tso->what_next = ThreadRelocated;
2928   tso->link = dest;
2929   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2930   tso->why_blocked = NotBlocked;
2931
2932   IF_PAR_DEBUG(verbose,
2933                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2934                      tso->id, tso, tso->stack_size);
2935                /* If we're debugging, just print out the top of the stack */
2936                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2937                                                 tso->sp+64)));
2938   
2939   IF_DEBUG(sanity,checkTSO(tso));
2940 #if 0
2941   IF_DEBUG(scheduler,printTSO(dest));
2942 #endif
2943
2944   return dest;
2945 }
2946
2947 /* ---------------------------------------------------------------------------
2948    Wake up a queue that was blocked on some resource.
2949    ------------------------------------------------------------------------ */
2950
2951 #if defined(GRAN)
2952 STATIC_INLINE void
2953 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2954 {
2955 }
2956 #elif defined(PARALLEL_HASKELL)
2957 STATIC_INLINE void
2958 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2959 {
2960   /* write RESUME events to log file and
2961      update blocked and fetch time (depending on type of the orig closure) */
2962   if (RtsFlags.ParFlags.ParStats.Full) {
2963     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2964                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2965                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2966     if (emptyRunQueue())
2967       emitSchedule = rtsTrue;
2968
2969     switch (get_itbl(node)->type) {
2970         case FETCH_ME_BQ:
2971           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2972           break;
2973         case RBH:
2974         case FETCH_ME:
2975         case BLACKHOLE_BQ:
2976           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2977           break;
2978 #ifdef DIST
2979         case MVAR:
2980           break;
2981 #endif    
2982         default:
2983           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2984         }
2985       }
2986 }
2987 #endif
2988
2989 #if defined(GRAN)
2990 StgBlockingQueueElement *
2991 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2992 {
2993     StgTSO *tso;
2994     PEs node_loc, tso_loc;
2995
2996     node_loc = where_is(node); // should be lifted out of loop
2997     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2998     tso_loc = where_is((StgClosure *)tso);
2999     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3000       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3001       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3002       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3003       // insertThread(tso, node_loc);
3004       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3005                 ResumeThread,
3006                 tso, node, (rtsSpark*)NULL);
3007       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3008       // len_local++;
3009       // len++;
3010     } else { // TSO is remote (actually should be FMBQ)
3011       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3012                                   RtsFlags.GranFlags.Costs.gunblocktime +
3013                                   RtsFlags.GranFlags.Costs.latency;
3014       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3015                 UnblockThread,
3016                 tso, node, (rtsSpark*)NULL);
3017       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3018       // len++;
3019     }
3020     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3021     IF_GRAN_DEBUG(bq,
3022                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3023                           (node_loc==tso_loc ? "Local" : "Global"), 
3024                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3025     tso->block_info.closure = NULL;
3026     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3027                              tso->id, tso));
3028 }
3029 #elif defined(PARALLEL_HASKELL)
3030 StgBlockingQueueElement *
3031 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3032 {
3033     StgBlockingQueueElement *next;
3034
3035     switch (get_itbl(bqe)->type) {
3036     case TSO:
3037       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3038       /* if it's a TSO just push it onto the run_queue */
3039       next = bqe->link;
3040       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3041       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3042       threadRunnable();
3043       unblockCount(bqe, node);
3044       /* reset blocking status after dumping event */
3045       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3046       break;
3047
3048     case BLOCKED_FETCH:
3049       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3050       next = bqe->link;
3051       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3052       PendingFetches = (StgBlockedFetch *)bqe;
3053       break;
3054
3055 # if defined(DEBUG)
3056       /* can ignore this case in a non-debugging setup; 
3057          see comments on RBHSave closures above */
3058     case CONSTR:
3059       /* check that the closure is an RBHSave closure */
3060       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3061              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3062              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3063       break;
3064
3065     default:
3066       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3067            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3068            (StgClosure *)bqe);
3069 # endif
3070     }
3071   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3072   return next;
3073 }
3074 #endif
3075
3076 StgTSO *
3077 unblockOne(Capability *cap, StgTSO *tso)
3078 {
3079   StgTSO *next;
3080
3081   ASSERT(get_itbl(tso)->type == TSO);
3082   ASSERT(tso->why_blocked != NotBlocked);
3083   tso->why_blocked = NotBlocked;
3084   next = tso->link;
3085   tso->link = END_TSO_QUEUE;
3086
3087   // We might have just migrated this TSO to our Capability:
3088   if (tso->bound) {
3089       tso->bound->cap = cap;
3090   }
3091
3092   appendToRunQueue(cap,tso);
3093
3094   // we're holding a newly woken thread, make sure we context switch
3095   // quickly so we can migrate it if necessary.
3096   context_switch = 1;
3097   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3098   return next;
3099 }
3100
3101
3102 #if defined(GRAN)
3103 void 
3104 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3105 {
3106   StgBlockingQueueElement *bqe;
3107   PEs node_loc;
3108   nat len = 0; 
3109
3110   IF_GRAN_DEBUG(bq, 
3111                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3112                       node, CurrentProc, CurrentTime[CurrentProc], 
3113                       CurrentTSO->id, CurrentTSO));
3114
3115   node_loc = where_is(node);
3116
3117   ASSERT(q == END_BQ_QUEUE ||
3118          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3119          get_itbl(q)->type == CONSTR); // closure (type constructor)
3120   ASSERT(is_unique(node));
3121
3122   /* FAKE FETCH: magically copy the node to the tso's proc;
3123      no Fetch necessary because in reality the node should not have been 
3124      moved to the other PE in the first place
3125   */
3126   if (CurrentProc!=node_loc) {
3127     IF_GRAN_DEBUG(bq, 
3128                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3129                         node, node_loc, CurrentProc, CurrentTSO->id, 
3130                         // CurrentTSO, where_is(CurrentTSO),
3131                         node->header.gran.procs));
3132     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3133     IF_GRAN_DEBUG(bq, 
3134                   debugBelch("## new bitmask of node %p is %#x\n",
3135                         node, node->header.gran.procs));
3136     if (RtsFlags.GranFlags.GranSimStats.Global) {
3137       globalGranStats.tot_fake_fetches++;
3138     }
3139   }
3140
3141   bqe = q;
3142   // ToDo: check: ASSERT(CurrentProc==node_loc);
3143   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3144     //next = bqe->link;
3145     /* 
3146        bqe points to the current element in the queue
3147        next points to the next element in the queue
3148     */
3149     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3150     //tso_loc = where_is(tso);
3151     len++;
3152     bqe = unblockOne(bqe, node);
3153   }
3154
3155   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3156      the closure to make room for the anchor of the BQ */
3157   if (bqe!=END_BQ_QUEUE) {
3158     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3159     /*
3160     ASSERT((info_ptr==&RBH_Save_0_info) ||
3161            (info_ptr==&RBH_Save_1_info) ||
3162            (info_ptr==&RBH_Save_2_info));
3163     */
3164     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3165     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3166     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3167
3168     IF_GRAN_DEBUG(bq,
3169                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3170                         node, info_type(node)));
3171   }
3172
3173   /* statistics gathering */
3174   if (RtsFlags.GranFlags.GranSimStats.Global) {
3175     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3176     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3177     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3178     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3179   }
3180   IF_GRAN_DEBUG(bq,
3181                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3182                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3183 }
3184 #elif defined(PARALLEL_HASKELL)
3185 void 
3186 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3187 {
3188   StgBlockingQueueElement *bqe;
3189
3190   IF_PAR_DEBUG(verbose, 
3191                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3192                      node, mytid));
3193 #ifdef DIST  
3194   //RFP
3195   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3196     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3197     return;
3198   }
3199 #endif
3200   
3201   ASSERT(q == END_BQ_QUEUE ||
3202          get_itbl(q)->type == TSO ||           
3203          get_itbl(q)->type == BLOCKED_FETCH || 
3204          get_itbl(q)->type == CONSTR); 
3205
3206   bqe = q;
3207   while (get_itbl(bqe)->type==TSO || 
3208          get_itbl(bqe)->type==BLOCKED_FETCH) {
3209     bqe = unblockOne(bqe, node);
3210   }
3211 }
3212
3213 #else   /* !GRAN && !PARALLEL_HASKELL */
3214
3215 void
3216 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3217 {
3218     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3219                              // Exception.cmm
3220     while (tso != END_TSO_QUEUE) {
3221         tso = unblockOne(cap,tso);
3222     }
3223 }
3224 #endif
3225
3226 /* ---------------------------------------------------------------------------
3227    Interrupt execution
3228    - usually called inside a signal handler so it mustn't do anything fancy.   
3229    ------------------------------------------------------------------------ */
3230
3231 void
3232 interruptStgRts(void)
3233 {
3234     interrupted    = 1;
3235     context_switch = 1;
3236 #if defined(THREADED_RTS)
3237     prodAllCapabilities();
3238 #endif
3239 }
3240
3241 /* -----------------------------------------------------------------------------
3242    Unblock a thread
3243
3244    This is for use when we raise an exception in another thread, which
3245    may be blocked.
3246    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3247    -------------------------------------------------------------------------- */
3248
3249 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3250 /*
3251   NB: only the type of the blocking queue is different in GranSim and GUM
3252       the operations on the queue-elements are the same
3253       long live polymorphism!
3254
3255   Locks: sched_mutex is held upon entry and exit.
3256
3257 */
3258 static void
3259 unblockThread(Capability *cap, StgTSO *tso)
3260 {
3261   StgBlockingQueueElement *t, **last;
3262
3263   switch (tso->why_blocked) {
3264
3265   case NotBlocked:
3266     return;  /* not blocked */
3267
3268   case BlockedOnSTM:
3269     // Be careful: nothing to do here!  We tell the scheduler that the thread
3270     // is runnable and we leave it to the stack-walking code to abort the 
3271     // transaction while unwinding the stack.  We should perhaps have a debugging
3272     // test to make sure that this really happens and that the 'zombie' transaction
3273     // does not get committed.
3274     goto done;
3275
3276   case BlockedOnMVar:
3277     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3278     {
3279       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3280       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3281
3282       last = (StgBlockingQueueElement **)&mvar->head;
3283       for (t = (StgBlockingQueueElement *)mvar->head; 
3284            t != END_BQ_QUEUE; 
3285            last = &t->link, last_tso = t, t = t->link) {
3286         if (t == (StgBlockingQueueElement *)tso) {
3287           *last = (StgBlockingQueueElement *)tso->link;
3288           if (mvar->tail == tso) {
3289             mvar->tail = (StgTSO *)last_tso;
3290           }
3291           goto done;
3292         }
3293       }
3294       barf("unblockThread (MVAR): TSO not found");
3295     }
3296
3297   case BlockedOnBlackHole:
3298     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3299     {
3300       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3301
3302       last = &bq->blocking_queue;
3303       for (t = bq->blocking_queue; 
3304            t != END_BQ_QUEUE; 
3305            last = &t->link, t = t->link) {
3306         if (t == (StgBlockingQueueElement *)tso) {
3307           *last = (StgBlockingQueueElement *)tso->link;
3308           goto done;
3309         }
3310       }
3311       barf("unblockThread (BLACKHOLE): TSO not found");
3312     }
3313
3314   case BlockedOnException:
3315     {
3316       StgTSO *target  = tso->block_info.tso;
3317
3318       ASSERT(get_itbl(target)->type == TSO);
3319
3320       if (target->what_next == ThreadRelocated) {
3321           target = target->link;
3322           ASSERT(get_itbl(target)->type == TSO);
3323       }
3324
3325       ASSERT(target->blocked_exceptions != NULL);
3326
3327       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3328       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3329            t != END_BQ_QUEUE; 
3330            last = &t->link, t = t->link) {
3331         ASSERT(get_itbl(t)->type == TSO);
3332         if (t == (StgBlockingQueueElement *)tso) {
3333           *last = (StgBlockingQueueElement *)tso->link;
3334           goto done;
3335         }
3336       }
3337       barf("unblockThread (Exception): TSO not found");
3338     }
3339
3340   case BlockedOnRead:
3341   case BlockedOnWrite:
3342 #if defined(mingw32_HOST_OS)
3343   case BlockedOnDoProc:
3344 #endif
3345     {
3346       /* take TSO off blocked_queue */
3347       StgBlockingQueueElement *prev = NULL;
3348       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3349            prev = t, t = t->link) {
3350         if (t == (StgBlockingQueueElement *)tso) {
3351           if (prev == NULL) {
3352             blocked_queue_hd = (StgTSO *)t->link;
3353             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3354               blocked_queue_tl = END_TSO_QUEUE;
3355             }
3356           } else {
3357             prev->link = t->link;
3358             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3359               blocked_queue_tl = (StgTSO *)prev;
3360             }
3361           }
3362 #if defined(mingw32_HOST_OS)
3363           /* (Cooperatively) signal that the worker thread should abort
3364            * the request.
3365            */
3366           abandonWorkRequest(tso->block_info.async_result->reqID);
3367 #endif
3368           goto done;
3369         }
3370       }
3371       barf("unblockThread (I/O): TSO not found");
3372     }
3373
3374   case BlockedOnDelay:
3375     {
3376       /* take TSO off sleeping_queue */
3377       StgBlockingQueueElement *prev = NULL;
3378       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3379            prev = t, t = t->link) {
3380         if (t == (StgBlockingQueueElement *)tso) {
3381           if (prev == NULL) {
3382             sleeping_queue = (StgTSO *)t->link;
3383           } else {
3384             prev->link = t->link;
3385           }
3386           goto done;
3387         }
3388       }
3389       barf("unblockThread (delay): TSO not found");
3390     }
3391
3392   default:
3393     barf("unblockThread");
3394   }
3395
3396  done:
3397   tso->link = END_TSO_QUEUE;
3398   tso->why_blocked = NotBlocked;
3399   tso->block_info.closure = NULL;
3400   pushOnRunQueue(cap,tso);
3401 }
3402 #else
3403 static void
3404 unblockThread(Capability *cap, StgTSO *tso)
3405 {
3406   StgTSO *t, **last;
3407   
3408   /* To avoid locking unnecessarily. */
3409   if (tso->why_blocked == NotBlocked) {
3410     return;
3411   }
3412
3413   switch (tso->why_blocked) {
3414
3415   case BlockedOnSTM:
3416     // Be careful: nothing to do here!  We tell the scheduler that the thread
3417     // is runnable and we leave it to the stack-walking code to abort the 
3418     // transaction while unwinding the stack.  We should perhaps have a debugging
3419     // test to make sure that this really happens and that the 'zombie' transaction
3420     // does not get committed.
3421     goto done;
3422
3423   case BlockedOnMVar:
3424     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3425     {
3426       StgTSO *last_tso = END_TSO_QUEUE;
3427       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3428
3429       last = &mvar->head;
3430       for (t = mvar->head; t != END_TSO_QUEUE; 
3431            last = &t->link, last_tso = t, t = t->link) {
3432         if (t == tso) {
3433           *last = tso->link;
3434           if (mvar->tail == tso) {
3435             mvar->tail = last_tso;
3436           }
3437           goto done;
3438         }
3439       }
3440       barf("unblockThread (MVAR): TSO not found");
3441     }
3442
3443   case BlockedOnBlackHole:
3444     {
3445       last = &blackhole_queue;
3446       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3447            last = &t->link, t = t->link) {
3448         if (t == tso) {
3449           *last = tso->link;
3450           goto done;
3451         }
3452       }
3453       barf("unblockThread (BLACKHOLE): TSO not found");
3454     }
3455
3456   case BlockedOnException:
3457     {
3458       StgTSO *target  = tso->block_info.tso;
3459
3460       ASSERT(get_itbl(target)->type == TSO);
3461
3462       while (target->what_next == ThreadRelocated) {
3463           target = target->link;
3464           ASSERT(get_itbl(target)->type == TSO);
3465       }
3466       
3467       ASSERT(target->blocked_exceptions != NULL);
3468
3469       last = &target->blocked_exceptions;
3470       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3471            last = &t->link, t = t->link) {
3472         ASSERT(get_itbl(t)->type == TSO);
3473         if (t == tso) {
3474           *last = tso->link;
3475           goto done;
3476         }
3477       }
3478       barf("unblockThread (Exception): TSO not found");
3479     }
3480
3481 #if !defined(THREADED_RTS)
3482   case BlockedOnRead:
3483   case BlockedOnWrite:
3484 #if defined(mingw32_HOST_OS)
3485   case BlockedOnDoProc:
3486 #endif
3487     {
3488       StgTSO *prev = NULL;
3489       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3490            prev = t, t = t->link) {
3491         if (t == tso) {
3492           if (prev == NULL) {
3493             blocked_queue_hd = t->link;
3494             if (blocked_queue_tl == t) {
3495               blocked_queue_tl = END_TSO_QUEUE;
3496             }
3497           } else {
3498             prev->link = t->link;
3499             if (blocked_queue_tl == t) {
3500               blocked_queue_tl = prev;
3501             }
3502           }
3503 #if defined(mingw32_HOST_OS)
3504           /* (Cooperatively) signal that the worker thread should abort
3505            * the request.
3506            */
3507           abandonWorkRequest(tso->block_info.async_result->reqID);
3508 #endif
3509           goto done;
3510         }
3511       }
3512       barf("unblockThread (I/O): TSO not found");
3513     }
3514
3515   case BlockedOnDelay:
3516     {
3517       StgTSO *prev = NULL;
3518       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3519            prev = t, t = t->link) {
3520         if (t == tso) {
3521           if (prev == NULL) {
3522             sleeping_queue = t->link;
3523           } else {
3524             prev->link = t->link;
3525           }
3526           goto done;
3527         }
3528       }
3529       barf("unblockThread (delay): TSO not found");
3530     }
3531 #endif
3532
3533   default:
3534     barf("unblockThread");
3535   }
3536
3537  done:
3538   tso->link = END_TSO_QUEUE;
3539   tso->why_blocked = NotBlocked;
3540   tso->block_info.closure = NULL;
3541   appendToRunQueue(cap,tso);
3542 }
3543 #endif
3544
3545 /* -----------------------------------------------------------------------------
3546  * checkBlackHoles()
3547  *
3548  * Check the blackhole_queue for threads that can be woken up.  We do
3549  * this periodically: before every GC, and whenever the run queue is
3550  * empty.
3551  *
3552  * An elegant solution might be to just wake up all the blocked
3553  * threads with awakenBlockedQueue occasionally: they'll go back to
3554  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3555  * doesn't give us a way to tell whether we've actually managed to
3556  * wake up any threads, so we would be busy-waiting.
3557  *
3558  * -------------------------------------------------------------------------- */
3559
3560 static rtsBool
3561 checkBlackHoles (Capability *cap)
3562 {
3563     StgTSO **prev, *t;
3564     rtsBool any_woke_up = rtsFalse;
3565     StgHalfWord type;
3566
3567     // blackhole_queue is global:
3568     ASSERT_LOCK_HELD(&sched_mutex);
3569
3570     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3571
3572     // ASSUMES: sched_mutex
3573     prev = &blackhole_queue;
3574     t = blackhole_queue;
3575     while (t != END_TSO_QUEUE) {
3576         ASSERT(t->why_blocked == BlockedOnBlackHole);
3577         type = get_itbl(t->block_info.closure)->type;
3578         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3579             IF_DEBUG(sanity,checkTSO(t));
3580             t = unblockOne(cap, t);
3581             // urk, the threads migrate to the current capability
3582             // here, but we'd like to keep them on the original one.
3583             *prev = t;
3584             any_woke_up = rtsTrue;
3585         } else {
3586             prev = &t->link;
3587             t = t->link;
3588         }
3589     }
3590
3591     return any_woke_up;
3592 }
3593
3594 /* -----------------------------------------------------------------------------
3595  * raiseAsync()
3596  *
3597  * The following function implements the magic for raising an
3598  * asynchronous exception in an existing thread.
3599  *
3600  * We first remove the thread from any queue on which it might be
3601  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3602  *
3603  * We strip the stack down to the innermost CATCH_FRAME, building
3604  * thunks in the heap for all the active computations, so they can 
3605  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3606  * an application of the handler to the exception, and push it on
3607  * the top of the stack.
3608  * 
3609  * How exactly do we save all the active computations?  We create an
3610  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3611  * AP_STACKs pushes everything from the corresponding update frame
3612  * upwards onto the stack.  (Actually, it pushes everything up to the
3613  * next update frame plus a pointer to the next AP_STACK object.
3614  * Entering the next AP_STACK object pushes more onto the stack until we
3615  * reach the last AP_STACK object - at which point the stack should look
3616  * exactly as it did when we killed the TSO and we can continue
3617  * execution by entering the closure on top of the stack.
3618  *
3619  * We can also kill a thread entirely - this happens if either (a) the 
3620  * exception passed to raiseAsync is NULL, or (b) there's no
3621  * CATCH_FRAME on the stack.  In either case, we strip the entire
3622  * stack and replace the thread with a zombie.
3623  *
3624  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3625  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3626  * the TSO is currently blocked on or on the run queue of.
3627  *
3628  * -------------------------------------------------------------------------- */
3629  
3630 void
3631 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3632 {
3633     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3634 }
3635
3636 void
3637 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3638 {
3639     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3640 }
3641
3642 static void
3643 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3644             rtsBool stop_at_atomically, StgPtr stop_here)
3645 {
3646     StgRetInfoTable *info;
3647     StgPtr sp, frame;
3648     nat i;
3649   
3650     // Thread already dead?
3651     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3652         return;
3653     }
3654
3655     IF_DEBUG(scheduler, 
3656              sched_belch("raising exception in thread %ld.", (long)tso->id));
3657     
3658     // Remove it from any blocking queues
3659     unblockThread(cap,tso);
3660
3661     // mark it dirty; we're about to change its stack.
3662     dirtyTSO(tso);
3663
3664     sp = tso->sp;
3665     
3666     // The stack freezing code assumes there's a closure pointer on
3667     // the top of the stack, so we have to arrange that this is the case...
3668     //
3669     if (sp[0] == (W_)&stg_enter_info) {
3670         sp++;
3671     } else {
3672         sp--;
3673         sp[0] = (W_)&stg_dummy_ret_closure;
3674     }
3675
3676     frame = sp + 1;
3677     while (stop_here == NULL || frame < stop_here) {
3678
3679         // 1. Let the top of the stack be the "current closure"
3680         //
3681         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3682         // CATCH_FRAME.
3683         //
3684         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3685         // current closure applied to the chunk of stack up to (but not
3686         // including) the update frame.  This closure becomes the "current
3687         // closure".  Go back to step 2.
3688         //
3689         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3690         // top of the stack applied to the exception.
3691         // 
3692         // 5. If it's a STOP_FRAME, then kill the thread.
3693         // 
3694         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3695         // transaction
3696        
3697         info = get_ret_itbl((StgClosure *)frame);
3698
3699         switch (info->i.type) {
3700
3701         case UPDATE_FRAME:
3702         {
3703             StgAP_STACK * ap;
3704             nat words;
3705             
3706             // First build an AP_STACK consisting of the stack chunk above the
3707             // current update frame, with the top word on the stack as the
3708             // fun field.
3709             //
3710             words = frame - sp - 1;
3711             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3712             
3713             ap->size = words;
3714             ap->fun  = (StgClosure *)sp[0];
3715             sp++;
3716             for(i=0; i < (nat)words; ++i) {
3717                 ap->payload[i] = (StgClosure *)*sp++;
3718             }
3719             
3720             SET_HDR(ap,&stg_AP_STACK_info,
3721                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3722             TICK_ALLOC_UP_THK(words+1,0);
3723             
3724             IF_DEBUG(scheduler,
3725                      debugBelch("sched: Updating ");
3726                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3727                      debugBelch(" with ");
3728                      printObj((StgClosure *)ap);
3729                 );
3730
3731             // Replace the updatee with an indirection
3732             //
3733             // Warning: if we're in a loop, more than one update frame on
3734             // the stack may point to the same object.  Be careful not to
3735             // overwrite an IND_OLDGEN in this case, because we'll screw
3736             // up the mutable lists.  To be on the safe side, don't
3737             // overwrite any kind of indirection at all.  See also
3738             // threadSqueezeStack in GC.c, where we have to make a similar
3739             // check.
3740             //
3741             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3742                 // revert the black hole
3743                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3744                                (StgClosure *)ap);
3745             }
3746             sp += sizeofW(StgUpdateFrame) - 1;
3747             sp[0] = (W_)ap; // push onto stack
3748             frame = sp + 1;
3749             continue; //no need to bump frame
3750         }
3751
3752         case STOP_FRAME:
3753             // We've stripped the entire stack, the thread is now dead.
3754             tso->what_next = ThreadKilled;
3755             tso->sp = frame + sizeofW(StgStopFrame);
3756             return;
3757
3758         case CATCH_FRAME:
3759             // If we find a CATCH_FRAME, and we've got an exception to raise,
3760             // then build the THUNK raise(exception), and leave it on
3761             // top of the CATCH_FRAME ready to enter.
3762             //
3763         {
3764 #ifdef PROFILING
3765             StgCatchFrame *cf = (StgCatchFrame *)frame;
3766 #endif
3767             StgThunk *raise;
3768             
3769             if (exception == NULL) break;
3770
3771             // we've got an exception to raise, so let's pass it to the
3772             // handler in this frame.
3773             //
3774             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3775             TICK_ALLOC_SE_THK(1,0);
3776             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3777             raise->payload[0] = exception;
3778             
3779             // throw away the stack from Sp up to the CATCH_FRAME.
3780             //
3781             sp = frame - 1;
3782             
3783             /* Ensure that async excpetions are blocked now, so we don't get
3784              * a surprise exception before we get around to executing the
3785              * handler.
3786              */
3787             if (tso->blocked_exceptions == NULL) {
3788                 tso->blocked_exceptions = END_TSO_QUEUE;
3789             }
3790
3791             /* Put the newly-built THUNK on top of the stack, ready to execute
3792              * when the thread restarts.
3793              */
3794             sp[0] = (W_)raise;
3795             sp[-1] = (W_)&stg_enter_info;
3796             tso->sp = sp-1;
3797             tso->what_next = ThreadRunGHC;
3798             IF_DEBUG(sanity, checkTSO(tso));
3799             return;
3800         }
3801             
3802         case ATOMICALLY_FRAME:
3803             if (stop_at_atomically) {
3804                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3805                 stmCondemnTransaction(cap, tso -> trec);
3806 #ifdef REG_R1
3807                 tso->sp = frame;
3808 #else
3809                 // R1 is not a register: the return convention for IO in
3810                 // this case puts the return value on the stack, so we
3811                 // need to set up the stack to return to the atomically
3812                 // frame properly...
3813                 tso->sp = frame - 2;
3814                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3815                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3816 #endif
3817                 tso->what_next = ThreadRunGHC;
3818                 return;
3819             }
3820             // Not stop_at_atomically... fall through and abort the
3821             // transaction.
3822             
3823         case CATCH_RETRY_FRAME:
3824             // IF we find an ATOMICALLY_FRAME then we abort the
3825             // current transaction and propagate the exception.  In
3826             // this case (unlike ordinary exceptions) we do not care
3827             // whether the transaction is valid or not because its
3828             // possible validity cannot have caused the exception
3829             // and will not be visible after the abort.
3830             IF_DEBUG(stm,
3831                      debugBelch("Found atomically block delivering async exception\n"));
3832             StgTRecHeader *trec = tso -> trec;
3833             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3834             stmAbortTransaction(cap, trec);
3835             tso -> trec = outer;
3836             break;
3837             
3838         default:
3839             break;
3840         }
3841
3842         // move on to the next stack frame
3843         frame += stack_frame_sizeW((StgClosure *)frame);
3844     }
3845
3846     // if we got here, then we stopped at stop_here
3847     ASSERT(stop_here != NULL);
3848 }
3849
3850 /* -----------------------------------------------------------------------------
3851    Deleting threads
3852
3853    This is used for interruption (^C) and forking, and corresponds to
3854    raising an exception but without letting the thread catch the
3855    exception.
3856    -------------------------------------------------------------------------- */
3857
3858 static void 
3859 deleteThread (Capability *cap, StgTSO *tso)
3860 {
3861   if (tso->why_blocked != BlockedOnCCall &&
3862       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3863       raiseAsync(cap,tso,NULL);
3864   }
3865 }
3866
3867 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3868 static void 
3869 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3870 { // for forkProcess only:
3871   // delete thread without giving it a chance to catch the KillThread exception
3872
3873   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3874       return;
3875   }
3876
3877   if (tso->why_blocked != BlockedOnCCall &&
3878       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3879       unblockThread(cap,tso);
3880   }
3881
3882   tso->what_next = ThreadKilled;
3883 }
3884 #endif
3885
3886 /* -----------------------------------------------------------------------------
3887    raiseExceptionHelper
3888    
3889    This function is called by the raise# primitve, just so that we can
3890    move some of the tricky bits of raising an exception from C-- into
3891    C.  Who knows, it might be a useful re-useable thing here too.
3892    -------------------------------------------------------------------------- */
3893
3894 StgWord
3895 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3896 {
3897     Capability *cap = regTableToCapability(reg);
3898     StgThunk *raise_closure = NULL;
3899     StgPtr p, next;
3900     StgRetInfoTable *info;
3901     //
3902     // This closure represents the expression 'raise# E' where E
3903     // is the exception raise.  It is used to overwrite all the
3904     // thunks which are currently under evaluataion.
3905     //
3906
3907     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3908     // LDV profiling: stg_raise_info has THUNK as its closure
3909     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3910     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3911     // 1 does not cause any problem unless profiling is performed.
3912     // However, when LDV profiling goes on, we need to linearly scan
3913     // small object pool, where raise_closure is stored, so we should
3914     // use MIN_UPD_SIZE.
3915     //
3916     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3917     //                                 sizeofW(StgClosure)+1);
3918     //
3919
3920     //
3921     // Walk up the stack, looking for the catch frame.  On the way,
3922     // we update any closures pointed to from update frames with the
3923     // raise closure that we just built.
3924     //
3925     p = tso->sp;
3926     while(1) {
3927         info = get_ret_itbl((StgClosure *)p);
3928         next = p + stack_frame_sizeW((StgClosure *)p);
3929         switch (info->i.type) {
3930             
3931         case UPDATE_FRAME:
3932             // Only create raise_closure if we need to.
3933             if (raise_closure == NULL) {
3934                 raise_closure = 
3935                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3936                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3937                 raise_closure->payload[0] = exception;
3938             }
3939             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3940             p = next;
3941             continue;
3942
3943         case ATOMICALLY_FRAME:
3944             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3945             tso->sp = p;
3946             return ATOMICALLY_FRAME;
3947             
3948         case CATCH_FRAME:
3949             tso->sp = p;
3950             return CATCH_FRAME;
3951
3952         case CATCH_STM_FRAME:
3953             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3954             tso->sp = p;
3955             return CATCH_STM_FRAME;
3956             
3957         case STOP_FRAME:
3958             tso->sp = p;
3959             return STOP_FRAME;
3960
3961         case CATCH_RETRY_FRAME:
3962         default:
3963             p = next; 
3964             continue;
3965         }
3966     }
3967 }
3968
3969
3970 /* -----------------------------------------------------------------------------
3971    findRetryFrameHelper
3972
3973    This function is called by the retry# primitive.  It traverses the stack
3974    leaving tso->sp referring to the frame which should handle the retry.  
3975
3976    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3977    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3978
3979    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3980    despite the similar implementation.
3981
3982    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3983    not be created within memory transactions.
3984    -------------------------------------------------------------------------- */
3985
3986 StgWord
3987 findRetryFrameHelper (StgTSO *tso)
3988 {
3989   StgPtr           p, next;
3990   StgRetInfoTable *info;
3991
3992   p = tso -> sp;
3993   while (1) {
3994     info = get_ret_itbl((StgClosure *)p);
3995     next = p + stack_frame_sizeW((StgClosure *)p);
3996     switch (info->i.type) {
3997       
3998     case ATOMICALLY_FRAME:
3999       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4000       tso->sp = p;
4001       return ATOMICALLY_FRAME;
4002       
4003     case CATCH_RETRY_FRAME:
4004       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4005       tso->sp = p;
4006       return CATCH_RETRY_FRAME;
4007       
4008     case CATCH_STM_FRAME:
4009     default:
4010       ASSERT(info->i.type != CATCH_FRAME);
4011       ASSERT(info->i.type != STOP_FRAME);
4012       p = next; 
4013       continue;
4014     }
4015   }
4016 }
4017
4018 /* -----------------------------------------------------------------------------
4019    resurrectThreads is called after garbage collection on the list of
4020    threads found to be garbage.  Each of these threads will be woken
4021    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4022    on an MVar, or NonTermination if the thread was blocked on a Black
4023    Hole.
4024
4025    Locks: assumes we hold *all* the capabilities.
4026    -------------------------------------------------------------------------- */
4027
4028 void
4029 resurrectThreads (StgTSO *threads)
4030 {
4031     StgTSO *tso, *next;
4032     Capability *cap;
4033
4034     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4035         next = tso->global_link;
4036         tso->global_link = all_threads;
4037         all_threads = tso;
4038         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4039         
4040         // Wake up the thread on the Capability it was last on for a
4041         // bound thread, or last_free_capability otherwise.
4042         if (tso->bound) {
4043             cap = tso->bound->cap;
4044         } else {
4045             cap = last_free_capability;
4046         }
4047         
4048         switch (tso->why_blocked) {
4049         case BlockedOnMVar:
4050         case BlockedOnException:
4051             /* Called by GC - sched_mutex lock is currently held. */
4052             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4053             break;
4054         case BlockedOnBlackHole:
4055             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4056             break;
4057         case BlockedOnSTM:
4058             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4059             break;
4060         case NotBlocked:
4061             /* This might happen if the thread was blocked on a black hole
4062              * belonging to a thread that we've just woken up (raiseAsync
4063              * can wake up threads, remember...).
4064              */
4065             continue;
4066         default:
4067             barf("resurrectThreads: thread blocked in a strange way");
4068         }
4069     }
4070 }
4071
4072 /* ----------------------------------------------------------------------------
4073  * Debugging: why is a thread blocked
4074  * [Also provides useful information when debugging threaded programs
4075  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4076    ------------------------------------------------------------------------- */
4077
4078 #if DEBUG
4079 static void
4080 printThreadBlockage(StgTSO *tso)
4081 {
4082   switch (tso->why_blocked) {
4083   case BlockedOnRead:
4084     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4085     break;
4086   case BlockedOnWrite:
4087     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4088     break;
4089 #if defined(mingw32_HOST_OS)
4090     case BlockedOnDoProc:
4091     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4092     break;
4093 #endif
4094   case BlockedOnDelay:
4095     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4096     break;
4097   case BlockedOnMVar:
4098     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4099     break;
4100   case BlockedOnException:
4101     debugBelch("is blocked on delivering an exception to thread %d",
4102             tso->block_info.tso->id);
4103     break;
4104   case BlockedOnBlackHole:
4105     debugBelch("is blocked on a black hole");
4106     break;
4107   case NotBlocked:
4108     debugBelch("is not blocked");
4109     break;
4110 #if defined(PARALLEL_HASKELL)
4111   case BlockedOnGA:
4112     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4113             tso->block_info.closure, info_type(tso->block_info.closure));
4114     break;
4115   case BlockedOnGA_NoSend:
4116     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4117             tso->block_info.closure, info_type(tso->block_info.closure));
4118     break;
4119 #endif
4120   case BlockedOnCCall:
4121     debugBelch("is blocked on an external call");
4122     break;
4123   case BlockedOnCCall_NoUnblockExc:
4124     debugBelch("is blocked on an external call (exceptions were already blocked)");
4125     break;
4126   case BlockedOnSTM:
4127     debugBelch("is blocked on an STM operation");
4128     break;
4129   default:
4130     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4131          tso->why_blocked, tso->id, tso);
4132   }
4133 }
4134
4135 void
4136 printThreadStatus(StgTSO *t)
4137 {
4138     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4139     {
4140       void *label = lookupThreadLabel(t->id);
4141       if (label) debugBelch("[\"%s\"] ",(char *)label);
4142     }
4143     if (t->what_next == ThreadRelocated) {
4144         debugBelch("has been relocated...\n");
4145     } else {
4146         switch (t->what_next) {
4147         case ThreadKilled:
4148             debugBelch("has been killed");
4149             break;
4150         case ThreadComplete:
4151             debugBelch("has completed");
4152             break;
4153         default:
4154             printThreadBlockage(t);
4155         }
4156         debugBelch("\n");
4157     }
4158 }
4159
4160 void
4161 printAllThreads(void)
4162 {
4163   StgTSO *t, *next;
4164   nat i;
4165   Capability *cap;
4166
4167 # if defined(GRAN)
4168   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4169   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4170                        time_string, rtsFalse/*no commas!*/);
4171
4172   debugBelch("all threads at [%s]:\n", time_string);
4173 # elif defined(PARALLEL_HASKELL)
4174   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4175   ullong_format_string(CURRENT_TIME,
4176                        time_string, rtsFalse/*no commas!*/);
4177
4178   debugBelch("all threads at [%s]:\n", time_string);
4179 # else
4180   debugBelch("all threads:\n");
4181 # endif
4182
4183   for (i = 0; i < n_capabilities; i++) {
4184       cap = &capabilities[i];
4185       debugBelch("threads on capability %d:\n", cap->no);
4186       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4187           printThreadStatus(t);
4188       }
4189   }
4190
4191   debugBelch("other threads:\n");
4192   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4193       if (t->why_blocked != NotBlocked) {
4194           printThreadStatus(t);
4195       }
4196       if (t->what_next == ThreadRelocated) {
4197           next = t->link;
4198       } else {
4199           next = t->global_link;
4200       }
4201   }
4202 }
4203
4204 // useful from gdb
4205 void 
4206 printThreadQueue(StgTSO *t)
4207 {
4208     nat i = 0;
4209     for (; t != END_TSO_QUEUE; t = t->link) {
4210         printThreadStatus(t);
4211         i++;
4212     }
4213     debugBelch("%d threads on queue\n", i);
4214 }
4215
4216 /* 
4217    Print a whole blocking queue attached to node (debugging only).
4218 */
4219 # if defined(PARALLEL_HASKELL)
4220 void 
4221 print_bq (StgClosure *node)
4222 {
4223   StgBlockingQueueElement *bqe;
4224   StgTSO *tso;
4225   rtsBool end;
4226
4227   debugBelch("## BQ of closure %p (%s): ",
4228           node, info_type(node));
4229
4230   /* should cover all closures that may have a blocking queue */
4231   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4232          get_itbl(node)->type == FETCH_ME_BQ ||
4233          get_itbl(node)->type == RBH ||
4234          get_itbl(node)->type == MVAR);
4235     
4236   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4237
4238   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4239 }
4240
4241 /* 
4242    Print a whole blocking queue starting with the element bqe.
4243 */
4244 void 
4245 print_bqe (StgBlockingQueueElement *bqe)
4246 {
4247   rtsBool end;
4248
4249   /* 
4250      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4251   */
4252   for (end = (bqe==END_BQ_QUEUE);
4253        !end; // iterate until bqe points to a CONSTR
4254        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4255        bqe = end ? END_BQ_QUEUE : bqe->link) {
4256     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4257     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4258     /* types of closures that may appear in a blocking queue */
4259     ASSERT(get_itbl(bqe)->type == TSO ||           
4260            get_itbl(bqe)->type == BLOCKED_FETCH || 
4261            get_itbl(bqe)->type == CONSTR); 
4262     /* only BQs of an RBH end with an RBH_Save closure */
4263     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4264
4265     switch (get_itbl(bqe)->type) {
4266     case TSO:
4267       debugBelch(" TSO %u (%x),",
4268               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4269       break;
4270     case BLOCKED_FETCH:
4271       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4272               ((StgBlockedFetch *)bqe)->node, 
4273               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4274               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4275               ((StgBlockedFetch *)bqe)->ga.weight);
4276       break;
4277     case CONSTR:
4278       debugBelch(" %s (IP %p),",
4279               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4280                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4281                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4282                "RBH_Save_?"), get_itbl(bqe));
4283       break;
4284     default:
4285       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4286            info_type((StgClosure *)bqe)); // , node, info_type(node));
4287       break;
4288     }
4289   } /* for */
4290   debugBelch("\n");
4291 }
4292 # elif defined(GRAN)
4293 void 
4294 print_bq (StgClosure *node)
4295 {
4296   StgBlockingQueueElement *bqe;
4297   PEs node_loc, tso_loc;
4298   rtsBool end;
4299
4300   /* should cover all closures that may have a blocking queue */
4301   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4302          get_itbl(node)->type == FETCH_ME_BQ ||
4303          get_itbl(node)->type == RBH);
4304     
4305   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4306   node_loc = where_is(node);
4307
4308   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4309           node, info_type(node), node_loc);
4310
4311   /* 
4312      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4313   */
4314   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4315        !end; // iterate until bqe points to a CONSTR
4316        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4317     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4318     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4319     /* types of closures that may appear in a blocking queue */
4320     ASSERT(get_itbl(bqe)->type == TSO ||           
4321            get_itbl(bqe)->type == CONSTR); 
4322     /* only BQs of an RBH end with an RBH_Save closure */
4323     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4324
4325     tso_loc = where_is((StgClosure *)bqe);
4326     switch (get_itbl(bqe)->type) {
4327     case TSO:
4328       debugBelch(" TSO %d (%p) on [PE %d],",
4329               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4330       break;
4331     case CONSTR:
4332       debugBelch(" %s (IP %p),",
4333               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4334                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4335                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4336                "RBH_Save_?"), get_itbl(bqe));
4337       break;
4338     default:
4339       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4340            info_type((StgClosure *)bqe), node, info_type(node));
4341       break;
4342     }
4343   } /* for */
4344   debugBelch("\n");
4345 }
4346 # endif
4347
4348 #if defined(PARALLEL_HASKELL)
4349 static nat
4350 run_queue_len(void)
4351 {
4352     nat i;
4353     StgTSO *tso;
4354     
4355     for (i=0, tso=run_queue_hd; 
4356          tso != END_TSO_QUEUE;
4357          i++, tso=tso->link) {
4358         /* nothing */
4359     }
4360         
4361     return i;
4362 }
4363 #endif
4364
4365 void
4366 sched_belch(char *s, ...)
4367 {
4368     va_list ap;
4369     va_start(ap,s);
4370 #ifdef THREADED_RTS
4371     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4372 #elif defined(PARALLEL_HASKELL)
4373     debugBelch("== ");
4374 #else
4375     debugBelch("sched: ");
4376 #endif
4377     vdebugBelch(s, ap);
4378     debugBelch("\n");
4379     va_end(ap);
4380 }
4381
4382 #endif /* DEBUG */