[project @ 2005-10-26 10:42:54 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 #ifdef THREADED_RTS
73 #define USED_WHEN_THREADED_RTS
74 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
75 #else
76 #define USED_WHEN_THREADED_RTS     STG_UNUSED
77 #define USED_WHEN_NON_THREADED_RTS
78 #endif
79
80 #ifdef SMP
81 #define USED_WHEN_SMP
82 #else
83 #define USED_WHEN_SMP STG_UNUSED
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Global variables
88  * -------------------------------------------------------------------------- */
89
90 #if defined(GRAN)
91
92 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
93 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
94
95 /* 
96    In GranSim we have a runnable and a blocked queue for each processor.
97    In order to minimise code changes new arrays run_queue_hds/tls
98    are created. run_queue_hd is then a short cut (macro) for
99    run_queue_hds[CurrentProc] (see GranSim.h).
100    -- HWL
101 */
102 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
103 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
104 StgTSO *ccalling_threadss[MAX_PROC];
105 /* We use the same global list of threads (all_threads) in GranSim as in
106    the std RTS (i.e. we are cheating). However, we don't use this list in
107    the GranSim specific code at the moment (so we are only potentially
108    cheating).  */
109
110 #else /* !GRAN */
111
112 #if !defined(THREADED_RTS)
113 // Blocked/sleeping thrads
114 StgTSO *blocked_queue_hd = NULL;
115 StgTSO *blocked_queue_tl = NULL;
116 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
117 #endif
118
119 /* Threads blocked on blackholes.
120  * LOCK: sched_mutex+capability, or all capabilities
121  */
122 StgTSO *blackhole_queue = NULL;
123 #endif
124
125 /* The blackhole_queue should be checked for threads to wake up.  See
126  * Schedule.h for more thorough comment.
127  * LOCK: none (doesn't matter if we miss an update)
128  */
129 rtsBool blackholes_need_checking = rtsFalse;
130
131 /* Linked list of all threads.
132  * Used for detecting garbage collected threads.
133  * LOCK: sched_mutex+capability, or all capabilities
134  */
135 StgTSO *all_threads = NULL;
136
137 /* flag set by signal handler to precipitate a context switch
138  * LOCK: none (just an advisory flag)
139  */
140 int context_switch = 0;
141
142 /* flag that tracks whether we have done any execution in this time slice.
143  * LOCK: currently none, perhaps we should lock (but needs to be
144  * updated in the fast path of the scheduler).
145  */
146 nat recent_activity = ACTIVITY_YES;
147
148 /* if this flag is set as well, give up execution
149  * LOCK: none (changes once, from false->true)
150  */
151 rtsBool interrupted = rtsFalse;
152
153 /* Next thread ID to allocate.
154  * LOCK: sched_mutex
155  */
156 static StgThreadID next_thread_id = 1;
157
158 /* The smallest stack size that makes any sense is:
159  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
160  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
161  *  + 1                       (the closure to enter)
162  *  + 1                       (stg_ap_v_ret)
163  *  + 1                       (spare slot req'd by stg_ap_v_ret)
164  *
165  * A thread with this stack will bomb immediately with a stack
166  * overflow, which will increase its stack size.  
167  */
168 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
169
170 #if defined(GRAN)
171 StgTSO *CurrentTSO;
172 #endif
173
174 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
175  *  exists - earlier gccs apparently didn't.
176  *  -= chak
177  */
178 StgTSO dummy_tso;
179
180 /*
181  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
182  * in an MT setting, needed to signal that a worker thread shouldn't hang around
183  * in the scheduler when it is out of work.
184  */
185 rtsBool shutting_down_scheduler = rtsFalse;
186
187 /*
188  * This mutex protects most of the global scheduler data in
189  * the THREADED_RTS and (inc. SMP) runtime.
190  */
191 #if defined(THREADED_RTS)
192 Mutex sched_mutex = INIT_MUTEX_VAR;
193 #endif
194
195 #if defined(PARALLEL_HASKELL)
196 StgTSO *LastTSO;
197 rtsTime TimeOfLastYield;
198 rtsBool emitSchedule = rtsTrue;
199 #endif
200
201 /* -----------------------------------------------------------------------------
202  * static function prototypes
203  * -------------------------------------------------------------------------- */
204
205 static Capability *schedule (Capability *initialCapability, Task *task);
206
207 //
208 // These function all encapsulate parts of the scheduler loop, and are
209 // abstracted only to make the structure and control flow of the
210 // scheduler clearer.
211 //
212 static void schedulePreLoop (void);
213 static void scheduleStartSignalHandlers (void);
214 static void scheduleCheckBlockedThreads (Capability *cap);
215 static void scheduleCheckBlackHoles (Capability *cap);
216 static void scheduleDetectDeadlock (Capability *cap, Task *task);
217 #if defined(GRAN)
218 static StgTSO *scheduleProcessEvent(rtsEvent *event);
219 #endif
220 #if defined(PARALLEL_HASKELL)
221 static StgTSO *scheduleSendPendingMessages(void);
222 static void scheduleActivateSpark(void);
223 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
224 #endif
225 #if defined(PAR) || defined(GRAN)
226 static void scheduleGranParReport(void);
227 #endif
228 static void schedulePostRunThread(void);
229 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
230 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
231                                          StgTSO *t);
232 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
233                                     nat prev_what_next );
234 static void scheduleHandleThreadBlocked( StgTSO *t );
235 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
236                                              StgTSO *t );
237 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
238 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
239
240 static void unblockThread(Capability *cap, StgTSO *tso);
241 static rtsBool checkBlackHoles(Capability *cap);
242 static void AllRoots(evac_fn evac);
243
244 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
245
246 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
247                         rtsBool stop_at_atomically);
248
249 static void deleteThread (Capability *cap, StgTSO *tso);
250 static void deleteRunQueue (Capability *cap);
251
252 #ifdef DEBUG
253 static void printThreadBlockage(StgTSO *tso);
254 static void printThreadStatus(StgTSO *tso);
255 void printThreadQueue(StgTSO *tso);
256 #endif
257
258 #if defined(PARALLEL_HASKELL)
259 StgTSO * createSparkThread(rtsSpark spark);
260 StgTSO * activateSpark (rtsSpark spark);  
261 #endif
262
263 #ifdef DEBUG
264 static char *whatNext_strs[] = {
265   "(unknown)",
266   "ThreadRunGHC",
267   "ThreadInterpret",
268   "ThreadKilled",
269   "ThreadRelocated",
270   "ThreadComplete"
271 };
272 #endif
273
274 /* -----------------------------------------------------------------------------
275  * Putting a thread on the run queue: different scheduling policies
276  * -------------------------------------------------------------------------- */
277
278 STATIC_INLINE void
279 addToRunQueue( Capability *cap, StgTSO *t )
280 {
281 #if defined(PARALLEL_HASKELL)
282     if (RtsFlags.ParFlags.doFairScheduling) { 
283         // this does round-robin scheduling; good for concurrency
284         appendToRunQueue(cap,t);
285     } else {
286         // this does unfair scheduling; good for parallelism
287         pushOnRunQueue(cap,t);
288     }
289 #else
290     // this does round-robin scheduling; good for concurrency
291     appendToRunQueue(cap,t);
292 #endif
293 }
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323
324 static Capability *
325 schedule (Capability *initialCapability, Task *task)
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PARALLEL_HASKELL)
333   StgTSO *tso;
334   GlobalTaskId pe;
335   rtsBool receivedFinish = rtsFalse;
336 # if defined(DEBUG)
337   nat tp_size, sp_size; // stats only
338 # endif
339 #endif
340   nat prev_what_next;
341   rtsBool ready_to_gc;
342   rtsBool first = rtsTrue;
343   
344   cap = initialCapability;
345
346   // Pre-condition: this task owns initialCapability.
347   // The sched_mutex is *NOT* held
348   // NB. on return, we still hold a capability.
349
350   IF_DEBUG(scheduler,
351            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
352                        task, initialCapability);
353       );
354
355   schedulePreLoop();
356
357   // -----------------------------------------------------------
358   // Scheduler loop starts here:
359
360 #if defined(PARALLEL_HASKELL)
361 #define TERMINATION_CONDITION        (!receivedFinish)
362 #elif defined(GRAN)
363 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
364 #else
365 #define TERMINATION_CONDITION        rtsTrue
366 #endif
367
368   while (TERMINATION_CONDITION) {
369
370 #if defined(GRAN)
371       /* Choose the processor with the next event */
372       CurrentProc = event->proc;
373       CurrentTSO = event->tso;
374 #endif
375
376 #if defined(THREADED_RTS)
377       if (first) {
378           // don't yield the first time, we want a chance to run this
379           // thread for a bit, even if there are others banging at the
380           // door.
381           first = rtsFalse;
382       } else {
383           // Yield the capability to higher-priority tasks if necessary.
384           yieldCapability(&cap, task);
385       }
386 #endif
387       
388       ASSERT(cap->running_task == task);
389       ASSERT(task->cap == cap);
390       ASSERT(myTask() == task);
391
392     // Check whether we have re-entered the RTS from Haskell without
393     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
394     // call).
395     if (cap->in_haskell) {
396           errorBelch("schedule: re-entered unsafely.\n"
397                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
398           stg_exit(EXIT_FAILURE);
399     }
400
401     //
402     // Test for interruption.  If interrupted==rtsTrue, then either
403     // we received a keyboard interrupt (^C), or the scheduler is
404     // trying to shut down all the tasks (shutting_down_scheduler) in
405     // the threaded RTS.
406     //
407     if (interrupted) {
408         deleteRunQueue(cap);
409         if (shutting_down_scheduler) {
410             IF_DEBUG(scheduler, sched_belch("shutting down"));
411         } else {
412             IF_DEBUG(scheduler, sched_belch("interrupted"));
413         }
414     }
415
416 #if defined(not_yet) && defined(SMP)
417     //
418     // Top up the run queue from our spark pool.  We try to make the
419     // number of threads in the run queue equal to the number of
420     // free capabilities.
421     //
422     {
423         StgClosure *spark;
424         if (emptyRunQueue()) {
425             spark = findSpark(rtsFalse);
426             if (spark == NULL) {
427                 break; /* no more sparks in the pool */
428             } else {
429                 createSparkThread(spark);         
430                 IF_DEBUG(scheduler,
431                          sched_belch("==^^ turning spark of closure %p into a thread",
432                                      (StgClosure *)spark));
433             }
434         }
435     }
436 #endif // SMP
437
438     scheduleStartSignalHandlers();
439
440     // Only check the black holes here if we've nothing else to do.
441     // During normal execution, the black hole list only gets checked
442     // at GC time, to avoid repeatedly traversing this possibly long
443     // list each time around the scheduler.
444     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
445
446     scheduleCheckBlockedThreads(cap);
447
448     scheduleDetectDeadlock(cap,task);
449
450     // Normally, the only way we can get here with no threads to
451     // run is if a keyboard interrupt received during 
452     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
453     // Additionally, it is not fatal for the
454     // threaded RTS to reach here with no threads to run.
455     //
456     // win32: might be here due to awaitEvent() being abandoned
457     // as a result of a console event having been delivered.
458     if ( emptyRunQueue(cap) ) {
459 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
460         ASSERT(interrupted);
461 #endif
462         continue; // nothing to do
463     }
464
465 #if defined(PARALLEL_HASKELL)
466     scheduleSendPendingMessages();
467     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
468         continue;
469
470 #if defined(SPARKS)
471     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
472 #endif
473
474     /* If we still have no work we need to send a FISH to get a spark
475        from another PE */
476     if (emptyRunQueue(cap)) {
477         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
478         ASSERT(rtsFalse); // should not happen at the moment
479     }
480     // from here: non-empty run queue.
481     //  TODO: merge above case with this, only one call processMessages() !
482     if (PacketsWaiting()) {  /* process incoming messages, if
483                                 any pending...  only in else
484                                 because getRemoteWork waits for
485                                 messages as well */
486         receivedFinish = processMessages();
487     }
488 #endif
489
490 #if defined(GRAN)
491     scheduleProcessEvent(event);
492 #endif
493
494     // 
495     // Get a thread to run
496     //
497     t = popRunQueue(cap);
498
499 #if defined(GRAN) || defined(PAR)
500     scheduleGranParReport(); // some kind of debuging output
501 #else
502     // Sanity check the thread we're about to run.  This can be
503     // expensive if there is lots of thread switching going on...
504     IF_DEBUG(sanity,checkTSO(t));
505 #endif
506
507 #if defined(THREADED_RTS)
508     // Check whether we can run this thread in the current task.
509     // If not, we have to pass our capability to the right task.
510     {
511         Task *bound = t->bound;
512       
513         if (bound) {
514             if (bound == task) {
515                 IF_DEBUG(scheduler,
516                          sched_belch("### Running thread %d in bound thread",
517                                      t->id));
518                 // yes, the Haskell thread is bound to the current native thread
519             } else {
520                 IF_DEBUG(scheduler,
521                          sched_belch("### thread %d bound to another OS thread",
522                                      t->id));
523                 // no, bound to a different Haskell thread: pass to that thread
524                 pushOnRunQueue(cap,t);
525                 continue;
526             }
527         } else {
528             // The thread we want to run is unbound.
529             if (task->tso) { 
530                 IF_DEBUG(scheduler,
531                          sched_belch("### this OS thread cannot run thread %d", t->id));
532                 // no, the current native thread is bound to a different
533                 // Haskell thread, so pass it to any worker thread
534                 pushOnRunQueue(cap,t);
535                 continue; 
536             }
537         }
538     }
539 #endif
540
541     cap->r.rCurrentTSO = t;
542     
543     /* context switches are initiated by the timer signal, unless
544      * the user specified "context switch as often as possible", with
545      * +RTS -C0
546      */
547     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
548         && !emptyThreadQueues(cap)) {
549         context_switch = 1;
550     }
551          
552 run_thread:
553
554     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
555                               (long)t->id, whatNext_strs[t->what_next]));
556
557 #if defined(PROFILING)
558     startHeapProfTimer();
559 #endif
560
561     // ----------------------------------------------------------------------
562     // Run the current thread 
563
564     prev_what_next = t->what_next;
565
566     errno = t->saved_errno;
567     cap->in_haskell = rtsTrue;
568
569     recent_activity = ACTIVITY_YES;
570
571     switch (prev_what_next) {
572         
573     case ThreadKilled:
574     case ThreadComplete:
575         /* Thread already finished, return to scheduler. */
576         ret = ThreadFinished;
577         break;
578         
579     case ThreadRunGHC:
580     {
581         StgRegTable *r;
582         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
583         cap = regTableToCapability(r);
584         ret = r->rRet;
585         break;
586     }
587     
588     case ThreadInterpret:
589         cap = interpretBCO(cap);
590         ret = cap->r.rRet;
591         break;
592         
593     default:
594         barf("schedule: invalid what_next field");
595     }
596
597     cap->in_haskell = rtsFalse;
598
599 #ifdef SMP
600     // If ret is ThreadBlocked, and this Task is bound to the TSO that
601     // blocked, we are in limbo - the TSO is now owned by whatever it
602     // is blocked on, and may in fact already have been woken up,
603     // perhaps even on a different Capability.  It may be the case
604     // that task->cap != cap.  We better yield this Capability
605     // immediately and return to normaility.
606     if (ret == ThreadBlocked) continue;
607 #endif
608
609     ASSERT(cap->running_task == task);
610     ASSERT(task->cap == cap);
611     ASSERT(myTask() == task);
612
613     // The TSO might have moved, eg. if it re-entered the RTS and a GC
614     // happened.  So find the new location:
615     t = cap->r.rCurrentTSO;
616
617     // And save the current errno in this thread.
618     t->saved_errno = errno;
619
620     // ----------------------------------------------------------------------
621     
622     // Costs for the scheduler are assigned to CCS_SYSTEM
623 #if defined(PROFILING)
624     stopHeapProfTimer();
625     CCCS = CCS_SYSTEM;
626 #endif
627     
628     // We have run some Haskell code: there might be blackhole-blocked
629     // threads to wake up now.
630     // Lock-free test here should be ok, we're just setting a flag.
631     if ( blackhole_queue != END_TSO_QUEUE ) {
632         blackholes_need_checking = rtsTrue;
633     }
634     
635 #if defined(THREADED_RTS)
636     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
637 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
638     IF_DEBUG(scheduler,debugBelch("sched: "););
639 #endif
640     
641     schedulePostRunThread();
642
643     ready_to_gc = rtsFalse;
644
645     switch (ret) {
646     case HeapOverflow:
647         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
648         break;
649
650     case StackOverflow:
651         scheduleHandleStackOverflow(cap,task,t);
652         break;
653
654     case ThreadYielding:
655         if (scheduleHandleYield(cap, t, prev_what_next)) {
656             // shortcut for switching between compiler/interpreter:
657             goto run_thread; 
658         }
659         break;
660
661     case ThreadBlocked:
662         scheduleHandleThreadBlocked(t);
663         break;
664
665     case ThreadFinished:
666         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
667         break;
668
669     default:
670       barf("schedule: invalid thread return code %d", (int)ret);
671     }
672
673     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
674     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
675   } /* end of while() */
676
677   IF_PAR_DEBUG(verbose,
678                debugBelch("== Leaving schedule() after having received Finish\n"));
679 }
680
681 /* ----------------------------------------------------------------------------
682  * Setting up the scheduler loop
683  * ------------------------------------------------------------------------- */
684
685 static void
686 schedulePreLoop(void)
687 {
688 #if defined(GRAN) 
689     /* set up first event to get things going */
690     /* ToDo: assign costs for system setup and init MainTSO ! */
691     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
692               ContinueThread, 
693               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
694     
695     IF_DEBUG(gran,
696              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
697                         CurrentTSO);
698              G_TSO(CurrentTSO, 5));
699     
700     if (RtsFlags.GranFlags.Light) {
701         /* Save current time; GranSim Light only */
702         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
703     }      
704 #endif
705 }
706
707 /* ----------------------------------------------------------------------------
708  * Start any pending signal handlers
709  * ------------------------------------------------------------------------- */
710
711 static void
712 scheduleStartSignalHandlers(void)
713 {
714 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
715     if (signals_pending()) { // safe outside the lock
716         startSignalHandlers();
717     }
718 #endif
719 }
720
721 /* ----------------------------------------------------------------------------
722  * Check for blocked threads that can be woken up.
723  * ------------------------------------------------------------------------- */
724
725 static void
726 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
727 {
728 #if !defined(THREADED_RTS)
729     //
730     // Check whether any waiting threads need to be woken up.  If the
731     // run queue is empty, and there are no other tasks running, we
732     // can wait indefinitely for something to happen.
733     //
734     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
735     {
736         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
737     }
738 #endif
739 }
740
741
742 /* ----------------------------------------------------------------------------
743  * Check for threads blocked on BLACKHOLEs that can be woken up
744  * ------------------------------------------------------------------------- */
745 static void
746 scheduleCheckBlackHoles (Capability *cap)
747 {
748     if ( blackholes_need_checking ) // check without the lock first
749     {
750         ACQUIRE_LOCK(&sched_mutex);
751         if ( blackholes_need_checking ) {
752             checkBlackHoles(cap);
753             blackholes_need_checking = rtsFalse;
754         }
755         RELEASE_LOCK(&sched_mutex);
756     }
757 }
758
759 /* ----------------------------------------------------------------------------
760  * Detect deadlock conditions and attempt to resolve them.
761  * ------------------------------------------------------------------------- */
762
763 static void
764 scheduleDetectDeadlock (Capability *cap, Task *task)
765 {
766
767 #if defined(PARALLEL_HASKELL)
768     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
769     return;
770 #endif
771
772     /* 
773      * Detect deadlock: when we have no threads to run, there are no
774      * threads blocked, waiting for I/O, or sleeping, and all the
775      * other tasks are waiting for work, we must have a deadlock of
776      * some description.
777      */
778     if ( emptyThreadQueues(cap) )
779     {
780 #if defined(THREADED_RTS)
781         /* 
782          * In the threaded RTS, we only check for deadlock if there
783          * has been no activity in a complete timeslice.  This means
784          * we won't eagerly start a full GC just because we don't have
785          * any threads to run currently.
786          */
787         if (recent_activity != ACTIVITY_INACTIVE) return;
788 #endif
789
790         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
791
792         // Garbage collection can release some new threads due to
793         // either (a) finalizers or (b) threads resurrected because
794         // they are unreachable and will therefore be sent an
795         // exception.  Any threads thus released will be immediately
796         // runnable.
797         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
798         recent_activity = ACTIVITY_DONE_GC;
799         
800         if ( !emptyRunQueue(cap) ) return;
801
802 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
803         /* If we have user-installed signal handlers, then wait
804          * for signals to arrive rather then bombing out with a
805          * deadlock.
806          */
807         if ( anyUserHandlers() ) {
808             IF_DEBUG(scheduler, 
809                      sched_belch("still deadlocked, waiting for signals..."));
810
811             awaitUserSignals();
812
813             if (signals_pending()) {
814                 startSignalHandlers();
815             }
816
817             // either we have threads to run, or we were interrupted:
818             ASSERT(!emptyRunQueue(cap) || interrupted);
819         }
820 #endif
821
822 #if !defined(THREADED_RTS)
823         /* Probably a real deadlock.  Send the current main thread the
824          * Deadlock exception.
825          */
826         if (task->tso) {
827             switch (task->tso->why_blocked) {
828             case BlockedOnSTM:
829             case BlockedOnBlackHole:
830             case BlockedOnException:
831             case BlockedOnMVar:
832                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
833                 return;
834             default:
835                 barf("deadlock: main thread blocked in a strange way");
836             }
837         }
838         return;
839 #endif
840     }
841 }
842
843 /* ----------------------------------------------------------------------------
844  * Process an event (GRAN only)
845  * ------------------------------------------------------------------------- */
846
847 #if defined(GRAN)
848 static StgTSO *
849 scheduleProcessEvent(rtsEvent *event)
850 {
851     StgTSO *t;
852
853     if (RtsFlags.GranFlags.Light)
854       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
855
856     /* adjust time based on time-stamp */
857     if (event->time > CurrentTime[CurrentProc] &&
858         event->evttype != ContinueThread)
859       CurrentTime[CurrentProc] = event->time;
860     
861     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
862     if (!RtsFlags.GranFlags.Light)
863       handleIdlePEs();
864
865     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
866
867     /* main event dispatcher in GranSim */
868     switch (event->evttype) {
869       /* Should just be continuing execution */
870     case ContinueThread:
871       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
872       /* ToDo: check assertion
873       ASSERT(run_queue_hd != (StgTSO*)NULL &&
874              run_queue_hd != END_TSO_QUEUE);
875       */
876       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
877       if (!RtsFlags.GranFlags.DoAsyncFetch &&
878           procStatus[CurrentProc]==Fetching) {
879         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
880               CurrentTSO->id, CurrentTSO, CurrentProc);
881         goto next_thread;
882       } 
883       /* Ignore ContinueThreads for completed threads */
884       if (CurrentTSO->what_next == ThreadComplete) {
885         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
886               CurrentTSO->id, CurrentTSO, CurrentProc);
887         goto next_thread;
888       } 
889       /* Ignore ContinueThreads for threads that are being migrated */
890       if (PROCS(CurrentTSO)==Nowhere) { 
891         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
892               CurrentTSO->id, CurrentTSO, CurrentProc);
893         goto next_thread;
894       }
895       /* The thread should be at the beginning of the run queue */
896       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
897         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
898               CurrentTSO->id, CurrentTSO, CurrentProc);
899         break; // run the thread anyway
900       }
901       /*
902       new_event(proc, proc, CurrentTime[proc],
903                 FindWork,
904                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
905       goto next_thread; 
906       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
907       break; // now actually run the thread; DaH Qu'vam yImuHbej 
908
909     case FetchNode:
910       do_the_fetchnode(event);
911       goto next_thread;             /* handle next event in event queue  */
912       
913     case GlobalBlock:
914       do_the_globalblock(event);
915       goto next_thread;             /* handle next event in event queue  */
916       
917     case FetchReply:
918       do_the_fetchreply(event);
919       goto next_thread;             /* handle next event in event queue  */
920       
921     case UnblockThread:   /* Move from the blocked queue to the tail of */
922       do_the_unblock(event);
923       goto next_thread;             /* handle next event in event queue  */
924       
925     case ResumeThread:  /* Move from the blocked queue to the tail of */
926       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
927       event->tso->gran.blocktime += 
928         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
929       do_the_startthread(event);
930       goto next_thread;             /* handle next event in event queue  */
931       
932     case StartThread:
933       do_the_startthread(event);
934       goto next_thread;             /* handle next event in event queue  */
935       
936     case MoveThread:
937       do_the_movethread(event);
938       goto next_thread;             /* handle next event in event queue  */
939       
940     case MoveSpark:
941       do_the_movespark(event);
942       goto next_thread;             /* handle next event in event queue  */
943       
944     case FindWork:
945       do_the_findwork(event);
946       goto next_thread;             /* handle next event in event queue  */
947       
948     default:
949       barf("Illegal event type %u\n", event->evttype);
950     }  /* switch */
951     
952     /* This point was scheduler_loop in the old RTS */
953
954     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
955
956     TimeOfLastEvent = CurrentTime[CurrentProc];
957     TimeOfNextEvent = get_time_of_next_event();
958     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
959     // CurrentTSO = ThreadQueueHd;
960
961     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
962                          TimeOfNextEvent));
963
964     if (RtsFlags.GranFlags.Light) 
965       GranSimLight_leave_system(event, &ActiveTSO); 
966
967     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
968
969     IF_DEBUG(gran, 
970              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
971
972     /* in a GranSim setup the TSO stays on the run queue */
973     t = CurrentTSO;
974     /* Take a thread from the run queue. */
975     POP_RUN_QUEUE(t); // take_off_run_queue(t);
976
977     IF_DEBUG(gran, 
978              debugBelch("GRAN: About to run current thread, which is\n");
979              G_TSO(t,5));
980
981     context_switch = 0; // turned on via GranYield, checking events and time slice
982
983     IF_DEBUG(gran, 
984              DumpGranEvent(GR_SCHEDULE, t));
985
986     procStatus[CurrentProc] = Busy;
987 }
988 #endif // GRAN
989
990 /* ----------------------------------------------------------------------------
991  * Send pending messages (PARALLEL_HASKELL only)
992  * ------------------------------------------------------------------------- */
993
994 #if defined(PARALLEL_HASKELL)
995 static StgTSO *
996 scheduleSendPendingMessages(void)
997 {
998     StgSparkPool *pool;
999     rtsSpark spark;
1000     StgTSO *t;
1001
1002 # if defined(PAR) // global Mem.Mgmt., omit for now
1003     if (PendingFetches != END_BF_QUEUE) {
1004         processFetches();
1005     }
1006 # endif
1007     
1008     if (RtsFlags.ParFlags.BufferTime) {
1009         // if we use message buffering, we must send away all message
1010         // packets which have become too old...
1011         sendOldBuffers(); 
1012     }
1013 }
1014 #endif
1015
1016 /* ----------------------------------------------------------------------------
1017  * Activate spark threads (PARALLEL_HASKELL only)
1018  * ------------------------------------------------------------------------- */
1019
1020 #if defined(PARALLEL_HASKELL)
1021 static void
1022 scheduleActivateSpark(void)
1023 {
1024 #if defined(SPARKS)
1025   ASSERT(emptyRunQueue());
1026 /* We get here if the run queue is empty and want some work.
1027    We try to turn a spark into a thread, and add it to the run queue,
1028    from where it will be picked up in the next iteration of the scheduler
1029    loop.
1030 */
1031
1032       /* :-[  no local threads => look out for local sparks */
1033       /* the spark pool for the current PE */
1034       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1035       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1036           pool->hd < pool->tl) {
1037         /* 
1038          * ToDo: add GC code check that we really have enough heap afterwards!!
1039          * Old comment:
1040          * If we're here (no runnable threads) and we have pending
1041          * sparks, we must have a space problem.  Get enough space
1042          * to turn one of those pending sparks into a
1043          * thread... 
1044          */
1045
1046         spark = findSpark(rtsFalse);            /* get a spark */
1047         if (spark != (rtsSpark) NULL) {
1048           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1049           IF_PAR_DEBUG(fish, // schedule,
1050                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1051                              tso->id, tso, advisory_thread_count));
1052
1053           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1054             IF_PAR_DEBUG(fish, // schedule,
1055                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1056                             spark));
1057             return rtsFalse; /* failed to generate a thread */
1058           }                  /* otherwise fall through & pick-up new tso */
1059         } else {
1060           IF_PAR_DEBUG(fish, // schedule,
1061                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1062                              spark_queue_len(pool)));
1063           return rtsFalse;  /* failed to generate a thread */
1064         }
1065         return rtsTrue;  /* success in generating a thread */
1066   } else { /* no more threads permitted or pool empty */
1067     return rtsFalse;  /* failed to generateThread */
1068   }
1069 #else
1070   tso = NULL; // avoid compiler warning only
1071   return rtsFalse;  /* dummy in non-PAR setup */
1072 #endif // SPARKS
1073 }
1074 #endif // PARALLEL_HASKELL
1075
1076 /* ----------------------------------------------------------------------------
1077  * Get work from a remote node (PARALLEL_HASKELL only)
1078  * ------------------------------------------------------------------------- */
1079     
1080 #if defined(PARALLEL_HASKELL)
1081 static rtsBool
1082 scheduleGetRemoteWork(rtsBool *receivedFinish)
1083 {
1084   ASSERT(emptyRunQueue());
1085
1086   if (RtsFlags.ParFlags.BufferTime) {
1087         IF_PAR_DEBUG(verbose, 
1088                 debugBelch("...send all pending data,"));
1089         {
1090           nat i;
1091           for (i=1; i<=nPEs; i++)
1092             sendImmediately(i); // send all messages away immediately
1093         }
1094   }
1095 # ifndef SPARKS
1096         //++EDEN++ idle() , i.e. send all buffers, wait for work
1097         // suppress fishing in EDEN... just look for incoming messages
1098         // (blocking receive)
1099   IF_PAR_DEBUG(verbose, 
1100                debugBelch("...wait for incoming messages...\n"));
1101   *receivedFinish = processMessages(); // blocking receive...
1102
1103         // and reenter scheduling loop after having received something
1104         // (return rtsFalse below)
1105
1106 # else /* activate SPARKS machinery */
1107 /* We get here, if we have no work, tried to activate a local spark, but still
1108    have no work. We try to get a remote spark, by sending a FISH message.
1109    Thread migration should be added here, and triggered when a sequence of 
1110    fishes returns without work. */
1111         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1112
1113       /* =8-[  no local sparks => look for work on other PEs */
1114         /*
1115          * We really have absolutely no work.  Send out a fish
1116          * (there may be some out there already), and wait for
1117          * something to arrive.  We clearly can't run any threads
1118          * until a SCHEDULE or RESUME arrives, and so that's what
1119          * we're hoping to see.  (Of course, we still have to
1120          * respond to other types of messages.)
1121          */
1122         rtsTime now = msTime() /*CURRENT_TIME*/;
1123         IF_PAR_DEBUG(verbose, 
1124                      debugBelch("--  now=%ld\n", now));
1125         IF_PAR_DEBUG(fish, // verbose,
1126              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1127                  (last_fish_arrived_at!=0 &&
1128                   last_fish_arrived_at+delay > now)) {
1129                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1130                      now, last_fish_arrived_at+delay, 
1131                      last_fish_arrived_at,
1132                      delay);
1133              });
1134   
1135         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1136             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1137           if (last_fish_arrived_at==0 ||
1138               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1139             /* outstandingFishes is set in sendFish, processFish;
1140                avoid flooding system with fishes via delay */
1141     next_fish_to_send_at = 0;  
1142   } else {
1143     /* ToDo: this should be done in the main scheduling loop to avoid the
1144              busy wait here; not so bad if fish delay is very small  */
1145     int iq = 0; // DEBUGGING -- HWL
1146     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1147     /* send a fish when ready, but process messages that arrive in the meantime */
1148     do {
1149       if (PacketsWaiting()) {
1150         iq++; // DEBUGGING
1151         *receivedFinish = processMessages();
1152       }
1153       now = msTime();
1154     } while (!*receivedFinish || now<next_fish_to_send_at);
1155     // JB: This means the fish could become obsolete, if we receive
1156     // work. Better check for work again? 
1157     // last line: while (!receivedFinish || !haveWork || now<...)
1158     // next line: if (receivedFinish || haveWork )
1159
1160     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1161       return rtsFalse;  // NB: this will leave scheduler loop
1162                         // immediately after return!
1163                           
1164     IF_PAR_DEBUG(fish, // verbose,
1165                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1166
1167   }
1168
1169     // JB: IMHO, this should all be hidden inside sendFish(...)
1170     /* pe = choosePE(); 
1171        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1172                 NEW_FISH_HUNGER);
1173
1174     // Global statistics: count no. of fishes
1175     if (RtsFlags.ParFlags.ParStats.Global &&
1176          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1177            globalParStats.tot_fish_mess++;
1178            }
1179     */ 
1180
1181   /* delayed fishes must have been sent by now! */
1182   next_fish_to_send_at = 0;  
1183   }
1184       
1185   *receivedFinish = processMessages();
1186 # endif /* SPARKS */
1187
1188  return rtsFalse;
1189  /* NB: this function always returns rtsFalse, meaning the scheduler
1190     loop continues with the next iteration; 
1191     rationale: 
1192       return code means success in finding work; we enter this function
1193       if there is no local work, thus have to send a fish which takes
1194       time until it arrives with work; in the meantime we should process
1195       messages in the main loop;
1196  */
1197 }
1198 #endif // PARALLEL_HASKELL
1199
1200 /* ----------------------------------------------------------------------------
1201  * PAR/GRAN: Report stats & debugging info(?)
1202  * ------------------------------------------------------------------------- */
1203
1204 #if defined(PAR) || defined(GRAN)
1205 static void
1206 scheduleGranParReport(void)
1207 {
1208   ASSERT(run_queue_hd != END_TSO_QUEUE);
1209
1210   /* Take a thread from the run queue, if we have work */
1211   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1212
1213     /* If this TSO has got its outport closed in the meantime, 
1214      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1215      * It has to be marked as TH_DEAD for this purpose.
1216      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1217
1218 JB: TODO: investigate wether state change field could be nuked
1219      entirely and replaced by the normal tso state (whatnext
1220      field). All we want to do is to kill tsos from outside.
1221      */
1222
1223     /* ToDo: write something to the log-file
1224     if (RTSflags.ParFlags.granSimStats && !sameThread)
1225         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1226
1227     CurrentTSO = t;
1228     */
1229     /* the spark pool for the current PE */
1230     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1231
1232     IF_DEBUG(scheduler, 
1233              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1234                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1235
1236     IF_PAR_DEBUG(fish,
1237              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1238                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1239
1240     if (RtsFlags.ParFlags.ParStats.Full && 
1241         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1242         (emitSchedule || // forced emit
1243          (t && LastTSO && t->id != LastTSO->id))) {
1244       /* 
1245          we are running a different TSO, so write a schedule event to log file
1246          NB: If we use fair scheduling we also have to write  a deschedule 
1247              event for LastTSO; with unfair scheduling we know that the
1248              previous tso has blocked whenever we switch to another tso, so
1249              we don't need it in GUM for now
1250       */
1251       IF_PAR_DEBUG(fish, // schedule,
1252                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1253
1254       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1255                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1256       emitSchedule = rtsFalse;
1257     }
1258 }     
1259 #endif
1260
1261 /* ----------------------------------------------------------------------------
1262  * After running a thread...
1263  * ------------------------------------------------------------------------- */
1264
1265 static void
1266 schedulePostRunThread(void)
1267 {
1268 #if defined(PAR)
1269     /* HACK 675: if the last thread didn't yield, make sure to print a 
1270        SCHEDULE event to the log file when StgRunning the next thread, even
1271        if it is the same one as before */
1272     LastTSO = t; 
1273     TimeOfLastYield = CURRENT_TIME;
1274 #endif
1275
1276   /* some statistics gathering in the parallel case */
1277
1278 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1279   switch (ret) {
1280     case HeapOverflow:
1281 # if defined(GRAN)
1282       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1283       globalGranStats.tot_heapover++;
1284 # elif defined(PAR)
1285       globalParStats.tot_heapover++;
1286 # endif
1287       break;
1288
1289      case StackOverflow:
1290 # if defined(GRAN)
1291       IF_DEBUG(gran, 
1292                DumpGranEvent(GR_DESCHEDULE, t));
1293       globalGranStats.tot_stackover++;
1294 # elif defined(PAR)
1295       // IF_DEBUG(par, 
1296       // DumpGranEvent(GR_DESCHEDULE, t);
1297       globalParStats.tot_stackover++;
1298 # endif
1299       break;
1300
1301     case ThreadYielding:
1302 # if defined(GRAN)
1303       IF_DEBUG(gran, 
1304                DumpGranEvent(GR_DESCHEDULE, t));
1305       globalGranStats.tot_yields++;
1306 # elif defined(PAR)
1307       // IF_DEBUG(par, 
1308       // DumpGranEvent(GR_DESCHEDULE, t);
1309       globalParStats.tot_yields++;
1310 # endif
1311       break; 
1312
1313     case ThreadBlocked:
1314 # if defined(GRAN)
1315       IF_DEBUG(scheduler,
1316                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1317                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1318                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1319                if (t->block_info.closure!=(StgClosure*)NULL)
1320                  print_bq(t->block_info.closure);
1321                debugBelch("\n"));
1322
1323       // ??? needed; should emit block before
1324       IF_DEBUG(gran, 
1325                DumpGranEvent(GR_DESCHEDULE, t)); 
1326       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1327       /*
1328         ngoq Dogh!
1329       ASSERT(procStatus[CurrentProc]==Busy || 
1330               ((procStatus[CurrentProc]==Fetching) && 
1331               (t->block_info.closure!=(StgClosure*)NULL)));
1332       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1333           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1334             procStatus[CurrentProc]==Fetching)) 
1335         procStatus[CurrentProc] = Idle;
1336       */
1337 # elif defined(PAR)
1338 //++PAR++  blockThread() writes the event (change?)
1339 # endif
1340     break;
1341
1342   case ThreadFinished:
1343     break;
1344
1345   default:
1346     barf("parGlobalStats: unknown return code");
1347     break;
1348     }
1349 #endif
1350 }
1351
1352 /* -----------------------------------------------------------------------------
1353  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1354  * -------------------------------------------------------------------------- */
1355
1356 static rtsBool
1357 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1358 {
1359     // did the task ask for a large block?
1360     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1361         // if so, get one and push it on the front of the nursery.
1362         bdescr *bd;
1363         lnat blocks;
1364         
1365         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1366         
1367         IF_DEBUG(scheduler,
1368                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1369                             (long)t->id, whatNext_strs[t->what_next], blocks));
1370         
1371         // don't do this if the nursery is (nearly) full, we'll GC first.
1372         if (cap->r.rCurrentNursery->link != NULL ||
1373             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1374                                                // if the nursery has only one block.
1375             
1376             ACQUIRE_SM_LOCK
1377             bd = allocGroup( blocks );
1378             RELEASE_SM_LOCK
1379             cap->r.rNursery->n_blocks += blocks;
1380             
1381             // link the new group into the list
1382             bd->link = cap->r.rCurrentNursery;
1383             bd->u.back = cap->r.rCurrentNursery->u.back;
1384             if (cap->r.rCurrentNursery->u.back != NULL) {
1385                 cap->r.rCurrentNursery->u.back->link = bd;
1386             } else {
1387 #if !defined(SMP)
1388                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1389                        g0s0 == cap->r.rNursery);
1390 #endif
1391                 cap->r.rNursery->blocks = bd;
1392             }             
1393             cap->r.rCurrentNursery->u.back = bd;
1394             
1395             // initialise it as a nursery block.  We initialise the
1396             // step, gen_no, and flags field of *every* sub-block in
1397             // this large block, because this is easier than making
1398             // sure that we always find the block head of a large
1399             // block whenever we call Bdescr() (eg. evacuate() and
1400             // isAlive() in the GC would both have to do this, at
1401             // least).
1402             { 
1403                 bdescr *x;
1404                 for (x = bd; x < bd + blocks; x++) {
1405                     x->step = cap->r.rNursery;
1406                     x->gen_no = 0;
1407                     x->flags = 0;
1408                 }
1409             }
1410             
1411             // This assert can be a killer if the app is doing lots
1412             // of large block allocations.
1413             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1414             
1415             // now update the nursery to point to the new block
1416             cap->r.rCurrentNursery = bd;
1417             
1418             // we might be unlucky and have another thread get on the
1419             // run queue before us and steal the large block, but in that
1420             // case the thread will just end up requesting another large
1421             // block.
1422             pushOnRunQueue(cap,t);
1423             return rtsFalse;  /* not actually GC'ing */
1424         }
1425     }
1426     
1427     IF_DEBUG(scheduler,
1428              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1429                         (long)t->id, whatNext_strs[t->what_next]));
1430 #if defined(GRAN)
1431     ASSERT(!is_on_queue(t,CurrentProc));
1432 #elif defined(PARALLEL_HASKELL)
1433     /* Currently we emit a DESCHEDULE event before GC in GUM.
1434        ToDo: either add separate event to distinguish SYSTEM time from rest
1435        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1436     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1437         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1438                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1439         emitSchedule = rtsTrue;
1440     }
1441 #endif
1442       
1443     pushOnRunQueue(cap,t);
1444     return rtsTrue;
1445     /* actual GC is done at the end of the while loop in schedule() */
1446 }
1447
1448 /* -----------------------------------------------------------------------------
1449  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1450  * -------------------------------------------------------------------------- */
1451
1452 static void
1453 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1454 {
1455     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1456                                   (long)t->id, whatNext_strs[t->what_next]));
1457     /* just adjust the stack for this thread, then pop it back
1458      * on the run queue.
1459      */
1460     { 
1461         /* enlarge the stack */
1462         StgTSO *new_t = threadStackOverflow(cap, t);
1463         
1464         /* This TSO has moved, so update any pointers to it from the
1465          * main thread stack.  It better not be on any other queues...
1466          * (it shouldn't be).
1467          */
1468         if (task->tso != NULL) {
1469             task->tso = new_t;
1470         }
1471         pushOnRunQueue(cap,new_t);
1472     }
1473 }
1474
1475 /* -----------------------------------------------------------------------------
1476  * Handle a thread that returned to the scheduler with ThreadYielding
1477  * -------------------------------------------------------------------------- */
1478
1479 static rtsBool
1480 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1481 {
1482     // Reset the context switch flag.  We don't do this just before
1483     // running the thread, because that would mean we would lose ticks
1484     // during GC, which can lead to unfair scheduling (a thread hogs
1485     // the CPU because the tick always arrives during GC).  This way
1486     // penalises threads that do a lot of allocation, but that seems
1487     // better than the alternative.
1488     context_switch = 0;
1489     
1490     /* put the thread back on the run queue.  Then, if we're ready to
1491      * GC, check whether this is the last task to stop.  If so, wake
1492      * up the GC thread.  getThread will block during a GC until the
1493      * GC is finished.
1494      */
1495     IF_DEBUG(scheduler,
1496              if (t->what_next != prev_what_next) {
1497                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1498                             (long)t->id, whatNext_strs[t->what_next]);
1499              } else {
1500                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1501                             (long)t->id, whatNext_strs[t->what_next]);
1502              }
1503         );
1504     
1505     IF_DEBUG(sanity,
1506              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1507              checkTSO(t));
1508     ASSERT(t->link == END_TSO_QUEUE);
1509     
1510     // Shortcut if we're just switching evaluators: don't bother
1511     // doing stack squeezing (which can be expensive), just run the
1512     // thread.
1513     if (t->what_next != prev_what_next) {
1514         return rtsTrue;
1515     }
1516     
1517 #if defined(GRAN)
1518     ASSERT(!is_on_queue(t,CurrentProc));
1519       
1520     IF_DEBUG(sanity,
1521              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1522              checkThreadQsSanity(rtsTrue));
1523
1524 #endif
1525
1526     addToRunQueue(cap,t);
1527
1528 #if defined(GRAN)
1529     /* add a ContinueThread event to actually process the thread */
1530     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1531               ContinueThread,
1532               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1533     IF_GRAN_DEBUG(bq, 
1534                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1535                   G_EVENTQ(0);
1536                   G_CURR_THREADQ(0));
1537 #endif
1538     return rtsFalse;
1539 }
1540
1541 /* -----------------------------------------------------------------------------
1542  * Handle a thread that returned to the scheduler with ThreadBlocked
1543  * -------------------------------------------------------------------------- */
1544
1545 static void
1546 scheduleHandleThreadBlocked( StgTSO *t
1547 #if !defined(GRAN) && !defined(DEBUG)
1548     STG_UNUSED
1549 #endif
1550     )
1551 {
1552 #if defined(GRAN)
1553     IF_DEBUG(scheduler,
1554              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1555                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1556              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1557     
1558     // ??? needed; should emit block before
1559     IF_DEBUG(gran, 
1560              DumpGranEvent(GR_DESCHEDULE, t)); 
1561     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1562     /*
1563       ngoq Dogh!
1564       ASSERT(procStatus[CurrentProc]==Busy || 
1565       ((procStatus[CurrentProc]==Fetching) && 
1566       (t->block_info.closure!=(StgClosure*)NULL)));
1567       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1568       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1569       procStatus[CurrentProc]==Fetching)) 
1570       procStatus[CurrentProc] = Idle;
1571     */
1572 #elif defined(PAR)
1573     IF_DEBUG(scheduler,
1574              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1575                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1576     IF_PAR_DEBUG(bq,
1577                  
1578                  if (t->block_info.closure!=(StgClosure*)NULL) 
1579                  print_bq(t->block_info.closure));
1580     
1581     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1582     blockThread(t);
1583     
1584     /* whatever we schedule next, we must log that schedule */
1585     emitSchedule = rtsTrue;
1586     
1587 #else /* !GRAN */
1588
1589       // We don't need to do anything.  The thread is blocked, and it
1590       // has tidied up its stack and placed itself on whatever queue
1591       // it needs to be on.
1592
1593 #if !defined(SMP)
1594     ASSERT(t->why_blocked != NotBlocked);
1595              // This might not be true under SMP: we don't have
1596              // exclusive access to this TSO, so someone might have
1597              // woken it up by now.  This actually happens: try
1598              // conc023 +RTS -N2.
1599 #endif
1600
1601     IF_DEBUG(scheduler,
1602              debugBelch("--<< thread %d (%s) stopped: ", 
1603                         t->id, whatNext_strs[t->what_next]);
1604              printThreadBlockage(t);
1605              debugBelch("\n"));
1606     
1607     /* Only for dumping event to log file 
1608        ToDo: do I need this in GranSim, too?
1609        blockThread(t);
1610     */
1611 #endif
1612 }
1613
1614 /* -----------------------------------------------------------------------------
1615  * Handle a thread that returned to the scheduler with ThreadFinished
1616  * -------------------------------------------------------------------------- */
1617
1618 static rtsBool
1619 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1620 {
1621     /* Need to check whether this was a main thread, and if so,
1622      * return with the return value.
1623      *
1624      * We also end up here if the thread kills itself with an
1625      * uncaught exception, see Exception.cmm.
1626      */
1627     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1628                                   t->id, whatNext_strs[t->what_next]));
1629
1630 #if defined(GRAN)
1631       endThread(t, CurrentProc); // clean-up the thread
1632 #elif defined(PARALLEL_HASKELL)
1633       /* For now all are advisory -- HWL */
1634       //if(t->priority==AdvisoryPriority) ??
1635       advisory_thread_count--; // JB: Caution with this counter, buggy!
1636       
1637 # if defined(DIST)
1638       if(t->dist.priority==RevalPriority)
1639         FinishReval(t);
1640 # endif
1641     
1642 # if defined(EDENOLD)
1643       // the thread could still have an outport... (BUG)
1644       if (t->eden.outport != -1) {
1645       // delete the outport for the tso which has finished...
1646         IF_PAR_DEBUG(eden_ports,
1647                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1648                               t->eden.outport, t->id));
1649         deleteOPT(t);
1650       }
1651       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1652       if (t->eden.epid != -1) {
1653         IF_PAR_DEBUG(eden_ports,
1654                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1655                            t->id, t->eden.epid));
1656         removeTSOfromProcess(t);
1657       }
1658 # endif 
1659
1660 # if defined(PAR)
1661       if (RtsFlags.ParFlags.ParStats.Full &&
1662           !RtsFlags.ParFlags.ParStats.Suppressed) 
1663         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1664
1665       //  t->par only contains statistics: left out for now...
1666       IF_PAR_DEBUG(fish,
1667                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1668                               t->id,t,t->par.sparkname));
1669 # endif
1670 #endif // PARALLEL_HASKELL
1671
1672       //
1673       // Check whether the thread that just completed was a bound
1674       // thread, and if so return with the result.  
1675       //
1676       // There is an assumption here that all thread completion goes
1677       // through this point; we need to make sure that if a thread
1678       // ends up in the ThreadKilled state, that it stays on the run
1679       // queue so it can be dealt with here.
1680       //
1681
1682       if (t->bound) {
1683
1684           if (t->bound != task) {
1685 #if !defined(THREADED_RTS)
1686               // Must be a bound thread that is not the topmost one.  Leave
1687               // it on the run queue until the stack has unwound to the
1688               // point where we can deal with this.  Leaving it on the run
1689               // queue also ensures that the garbage collector knows about
1690               // this thread and its return value (it gets dropped from the
1691               // all_threads list so there's no other way to find it).
1692               appendToRunQueue(cap,t);
1693               return rtsFalse;
1694 #else
1695               // this cannot happen in the threaded RTS, because a
1696               // bound thread can only be run by the appropriate Task.
1697               barf("finished bound thread that isn't mine");
1698 #endif
1699           }
1700
1701           ASSERT(task->tso == t);
1702
1703           if (t->what_next == ThreadComplete) {
1704               if (task->ret) {
1705                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1706                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1707               }
1708               task->stat = Success;
1709           } else {
1710               if (task->ret) {
1711                   *(task->ret) = NULL;
1712               }
1713               if (interrupted) {
1714                   task->stat = Interrupted;
1715               } else {
1716                   task->stat = Killed;
1717               }
1718           }
1719 #ifdef DEBUG
1720           removeThreadLabel((StgWord)task->tso->id);
1721 #endif
1722           return rtsTrue; // tells schedule() to return
1723       }
1724
1725       return rtsFalse;
1726 }
1727
1728 /* -----------------------------------------------------------------------------
1729  * Perform a heap census, if PROFILING
1730  * -------------------------------------------------------------------------- */
1731
1732 static rtsBool
1733 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1734 {
1735 #if defined(PROFILING)
1736     // When we have +RTS -i0 and we're heap profiling, do a census at
1737     // every GC.  This lets us get repeatable runs for debugging.
1738     if (performHeapProfile ||
1739         (RtsFlags.ProfFlags.profileInterval==0 &&
1740          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1741         GarbageCollect(GetRoots, rtsTrue);
1742         heapCensus();
1743         performHeapProfile = rtsFalse;
1744         return rtsTrue;  // true <=> we already GC'd
1745     }
1746 #endif
1747     return rtsFalse;
1748 }
1749
1750 /* -----------------------------------------------------------------------------
1751  * Perform a garbage collection if necessary
1752  * -------------------------------------------------------------------------- */
1753
1754 static void
1755 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1756 {
1757     StgTSO *t;
1758 #ifdef SMP
1759     static volatile StgWord waiting_for_gc;
1760     rtsBool was_waiting;
1761     nat i;
1762 #endif
1763
1764 #ifdef SMP
1765     // In order to GC, there must be no threads running Haskell code.
1766     // Therefore, the GC thread needs to hold *all* the capabilities,
1767     // and release them after the GC has completed.  
1768     //
1769     // This seems to be the simplest way: previous attempts involved
1770     // making all the threads with capabilities give up their
1771     // capabilities and sleep except for the *last* one, which
1772     // actually did the GC.  But it's quite hard to arrange for all
1773     // the other tasks to sleep and stay asleep.
1774     //
1775         
1776     was_waiting = cas(&waiting_for_gc, 0, 1);
1777     if (was_waiting) return;
1778
1779     for (i=0; i < n_capabilities; i++) {
1780         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1781         if (cap != &capabilities[i]) {
1782             Capability *pcap = &capabilities[i];
1783             // we better hope this task doesn't get migrated to
1784             // another Capability while we're waiting for this one.
1785             // It won't, because load balancing happens while we have
1786             // all the Capabilities, but even so it's a slightly
1787             // unsavoury invariant.
1788             task->cap = pcap;
1789             waitForReturnCapability(&pcap, task);
1790             if (pcap != &capabilities[i]) {
1791                 barf("scheduleDoGC: got the wrong capability");
1792             }
1793         }
1794     }
1795
1796     waiting_for_gc = rtsFalse;
1797 #endif
1798
1799     /* Kick any transactions which are invalid back to their
1800      * atomically frames.  When next scheduled they will try to
1801      * commit, this commit will fail and they will retry.
1802      */
1803     { 
1804         StgTSO *next;
1805
1806         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1807             if (t->what_next == ThreadRelocated) {
1808                 next = t->link;
1809             } else {
1810                 next = t->global_link;
1811                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1812                     if (!stmValidateNestOfTransactions (t -> trec)) {
1813                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1814                         
1815                         // strip the stack back to the
1816                         // ATOMICALLY_FRAME, aborting the (nested)
1817                         // transaction, and saving the stack of any
1818                         // partially-evaluated thunks on the heap.
1819                         raiseAsync_(cap, t, NULL, rtsTrue);
1820                         
1821 #ifdef REG_R1
1822                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1823 #endif
1824                     }
1825                 }
1826             }
1827         }
1828     }
1829     
1830     // so this happens periodically:
1831     scheduleCheckBlackHoles(cap);
1832     
1833     IF_DEBUG(scheduler, printAllThreads());
1834
1835     /* everybody back, start the GC.
1836      * Could do it in this thread, or signal a condition var
1837      * to do it in another thread.  Either way, we need to
1838      * broadcast on gc_pending_cond afterward.
1839      */
1840 #if defined(THREADED_RTS)
1841     IF_DEBUG(scheduler,sched_belch("doing GC"));
1842 #endif
1843     GarbageCollect(GetRoots, force_major);
1844     
1845 #if defined(SMP)
1846     // release our stash of capabilities.
1847     for (i = 0; i < n_capabilities; i++) {
1848         if (cap != &capabilities[i]) {
1849             task->cap = &capabilities[i];
1850             releaseCapability(&capabilities[i]);
1851         }
1852     }
1853     task->cap = cap;
1854 #endif
1855
1856 #if defined(GRAN)
1857     /* add a ContinueThread event to continue execution of current thread */
1858     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1859               ContinueThread,
1860               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1861     IF_GRAN_DEBUG(bq, 
1862                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1863                   G_EVENTQ(0);
1864                   G_CURR_THREADQ(0));
1865 #endif /* GRAN */
1866 }
1867
1868 /* ---------------------------------------------------------------------------
1869  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1870  * used by Control.Concurrent for error checking.
1871  * ------------------------------------------------------------------------- */
1872  
1873 StgBool
1874 rtsSupportsBoundThreads(void)
1875 {
1876 #if defined(THREADED_RTS)
1877   return rtsTrue;
1878 #else
1879   return rtsFalse;
1880 #endif
1881 }
1882
1883 /* ---------------------------------------------------------------------------
1884  * isThreadBound(tso): check whether tso is bound to an OS thread.
1885  * ------------------------------------------------------------------------- */
1886  
1887 StgBool
1888 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1889 {
1890 #if defined(THREADED_RTS)
1891   return (tso->bound != NULL);
1892 #endif
1893   return rtsFalse;
1894 }
1895
1896 /* ---------------------------------------------------------------------------
1897  * Singleton fork(). Do not copy any running threads.
1898  * ------------------------------------------------------------------------- */
1899
1900 #if !defined(mingw32_HOST_OS) && !defined(SMP)
1901 #define FORKPROCESS_PRIMOP_SUPPORTED
1902 #endif
1903
1904 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1905 static void 
1906 deleteThreadImmediately(Capability *cap, StgTSO *tso);
1907 #endif
1908 StgInt
1909 forkProcess(HsStablePtr *entry
1910 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1911             STG_UNUSED
1912 #endif
1913            )
1914 {
1915 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1916     pid_t pid;
1917     StgTSO* t,*next;
1918     Task *task;
1919     Capability *cap;
1920     
1921     IF_DEBUG(scheduler,sched_belch("forking!"));
1922     
1923     // ToDo: for SMP, we should probably acquire *all* the capabilities
1924     cap = rts_lock();
1925     
1926     pid = fork();
1927     
1928     if (pid) { // parent
1929         
1930         // just return the pid
1931         return pid;
1932         
1933     } else { // child
1934         
1935         // delete all threads
1936         cap->run_queue_hd = END_TSO_QUEUE;
1937         cap->run_queue_tl = END_TSO_QUEUE;
1938         
1939         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1940             next = t->link;
1941             
1942             // don't allow threads to catch the ThreadKilled exception
1943             deleteThreadImmediately(cap,t);
1944         }
1945         
1946         // wipe the main thread list
1947         while ((task = all_tasks) != NULL) {
1948             all_tasks = task->all_link;
1949             discardTask(task);
1950         }
1951         
1952         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1953         rts_checkSchedStatus("forkProcess",cap);
1954         
1955         rts_unlock(cap);
1956         hs_exit();                      // clean up and exit
1957         stg_exit(EXIT_SUCCESS);
1958     }
1959 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1960     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1961     return -1;
1962 #endif
1963 }
1964
1965 /* ---------------------------------------------------------------------------
1966  * Delete the threads on the run queue of the current capability.
1967  * ------------------------------------------------------------------------- */
1968    
1969 static void
1970 deleteRunQueue (Capability *cap)
1971 {
1972     StgTSO *t, *next;
1973     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
1974         ASSERT(t->what_next != ThreadRelocated);
1975         next = t->link;
1976         deleteThread(cap, t);
1977     }
1978 }
1979
1980 /* startThread and  insertThread are now in GranSim.c -- HWL */
1981
1982
1983 /* -----------------------------------------------------------------------------
1984    Managing the suspended_ccalling_tasks list.
1985    Locks required: sched_mutex
1986    -------------------------------------------------------------------------- */
1987
1988 STATIC_INLINE void
1989 suspendTask (Capability *cap, Task *task)
1990 {
1991     ASSERT(task->next == NULL && task->prev == NULL);
1992     task->next = cap->suspended_ccalling_tasks;
1993     task->prev = NULL;
1994     if (cap->suspended_ccalling_tasks) {
1995         cap->suspended_ccalling_tasks->prev = task;
1996     }
1997     cap->suspended_ccalling_tasks = task;
1998 }
1999
2000 STATIC_INLINE void
2001 recoverSuspendedTask (Capability *cap, Task *task)
2002 {
2003     if (task->prev) {
2004         task->prev->next = task->next;
2005     } else {
2006         ASSERT(cap->suspended_ccalling_tasks == task);
2007         cap->suspended_ccalling_tasks = task->next;
2008     }
2009     if (task->next) {
2010         task->next->prev = task->prev;
2011     }
2012     task->next = task->prev = NULL;
2013 }
2014
2015 /* ---------------------------------------------------------------------------
2016  * Suspending & resuming Haskell threads.
2017  * 
2018  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2019  * its capability before calling the C function.  This allows another
2020  * task to pick up the capability and carry on running Haskell
2021  * threads.  It also means that if the C call blocks, it won't lock
2022  * the whole system.
2023  *
2024  * The Haskell thread making the C call is put to sleep for the
2025  * duration of the call, on the susepended_ccalling_threads queue.  We
2026  * give out a token to the task, which it can use to resume the thread
2027  * on return from the C function.
2028  * ------------------------------------------------------------------------- */
2029    
2030 void *
2031 suspendThread (StgRegTable *reg)
2032 {
2033   Capability *cap;
2034   int saved_errno = errno;
2035   StgTSO *tso;
2036   Task *task;
2037
2038   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2039    */
2040   cap = regTableToCapability(reg);
2041
2042   task = cap->running_task;
2043   tso = cap->r.rCurrentTSO;
2044
2045   IF_DEBUG(scheduler,
2046            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2047
2048   // XXX this might not be necessary --SDM
2049   tso->what_next = ThreadRunGHC;
2050
2051   threadPaused(tso);
2052
2053   if(tso->blocked_exceptions == NULL)  {
2054       tso->why_blocked = BlockedOnCCall;
2055       tso->blocked_exceptions = END_TSO_QUEUE;
2056   } else {
2057       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2058   }
2059
2060   // Hand back capability
2061   task->suspended_tso = tso;
2062
2063   ACQUIRE_LOCK(&cap->lock);
2064
2065   suspendTask(cap,task);
2066   cap->in_haskell = rtsFalse;
2067   releaseCapability_(cap);
2068   
2069   RELEASE_LOCK(&cap->lock);
2070
2071 #if defined(THREADED_RTS)
2072   /* Preparing to leave the RTS, so ensure there's a native thread/task
2073      waiting to take over.
2074   */
2075   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2076 #endif
2077
2078   errno = saved_errno;
2079   return task;
2080 }
2081
2082 StgRegTable *
2083 resumeThread (void *task_)
2084 {
2085     StgTSO *tso;
2086     Capability *cap;
2087     int saved_errno = errno;
2088     Task *task = task_;
2089
2090     cap = task->cap;
2091     // Wait for permission to re-enter the RTS with the result.
2092     waitForReturnCapability(&cap,task);
2093     // we might be on a different capability now... but if so, our
2094     // entry on the suspended_ccalling_tasks list will also have been
2095     // migrated.
2096
2097     // Remove the thread from the suspended list
2098     recoverSuspendedTask(cap,task);
2099
2100     tso = task->suspended_tso;
2101     task->suspended_tso = NULL;
2102     tso->link = END_TSO_QUEUE;
2103     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2104     
2105     if (tso->why_blocked == BlockedOnCCall) {
2106         awakenBlockedQueue(cap,tso->blocked_exceptions);
2107         tso->blocked_exceptions = NULL;
2108     }
2109     
2110     /* Reset blocking status */
2111     tso->why_blocked  = NotBlocked;
2112     
2113     cap->r.rCurrentTSO = tso;
2114     cap->in_haskell = rtsTrue;
2115     errno = saved_errno;
2116
2117     return &cap->r;
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121  * Comparing Thread ids.
2122  *
2123  * This is used from STG land in the implementation of the
2124  * instances of Eq/Ord for ThreadIds.
2125  * ------------------------------------------------------------------------ */
2126
2127 int
2128 cmp_thread(StgPtr tso1, StgPtr tso2) 
2129
2130   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2131   StgThreadID id2 = ((StgTSO *)tso2)->id;
2132  
2133   if (id1 < id2) return (-1);
2134   if (id1 > id2) return 1;
2135   return 0;
2136 }
2137
2138 /* ---------------------------------------------------------------------------
2139  * Fetching the ThreadID from an StgTSO.
2140  *
2141  * This is used in the implementation of Show for ThreadIds.
2142  * ------------------------------------------------------------------------ */
2143 int
2144 rts_getThreadId(StgPtr tso) 
2145 {
2146   return ((StgTSO *)tso)->id;
2147 }
2148
2149 #ifdef DEBUG
2150 void
2151 labelThread(StgPtr tso, char *label)
2152 {
2153   int len;
2154   void *buf;
2155
2156   /* Caveat: Once set, you can only set the thread name to "" */
2157   len = strlen(label)+1;
2158   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2159   strncpy(buf,label,len);
2160   /* Update will free the old memory for us */
2161   updateThreadLabel(((StgTSO *)tso)->id,buf);
2162 }
2163 #endif /* DEBUG */
2164
2165 /* ---------------------------------------------------------------------------
2166    Create a new thread.
2167
2168    The new thread starts with the given stack size.  Before the
2169    scheduler can run, however, this thread needs to have a closure
2170    (and possibly some arguments) pushed on its stack.  See
2171    pushClosure() in Schedule.h.
2172
2173    createGenThread() and createIOThread() (in SchedAPI.h) are
2174    convenient packaged versions of this function.
2175
2176    currently pri (priority) is only used in a GRAN setup -- HWL
2177    ------------------------------------------------------------------------ */
2178 #if defined(GRAN)
2179 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2180 StgTSO *
2181 createThread(nat size, StgInt pri)
2182 #else
2183 StgTSO *
2184 createThread(Capability *cap, nat size)
2185 #endif
2186 {
2187     StgTSO *tso;
2188     nat stack_size;
2189
2190     /* sched_mutex is *not* required */
2191
2192     /* First check whether we should create a thread at all */
2193 #if defined(PARALLEL_HASKELL)
2194     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2195     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2196         threadsIgnored++;
2197         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2198                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2199         return END_TSO_QUEUE;
2200     }
2201     threadsCreated++;
2202 #endif
2203
2204 #if defined(GRAN)
2205     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2206 #endif
2207
2208     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2209
2210     /* catch ridiculously small stack sizes */
2211     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2212         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2213     }
2214
2215     stack_size = size - TSO_STRUCT_SIZEW;
2216     
2217     tso = (StgTSO *)allocateLocal(cap, size);
2218     TICK_ALLOC_TSO(stack_size, 0);
2219
2220     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2221 #if defined(GRAN)
2222     SET_GRAN_HDR(tso, ThisPE);
2223 #endif
2224
2225     // Always start with the compiled code evaluator
2226     tso->what_next = ThreadRunGHC;
2227
2228     tso->why_blocked  = NotBlocked;
2229     tso->blocked_exceptions = NULL;
2230     
2231     tso->saved_errno = 0;
2232     tso->bound = NULL;
2233     
2234     tso->stack_size     = stack_size;
2235     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2236                           - TSO_STRUCT_SIZEW;
2237     tso->sp             = (P_)&(tso->stack) + stack_size;
2238
2239     tso->trec = NO_TREC;
2240     
2241 #ifdef PROFILING
2242     tso->prof.CCCS = CCS_MAIN;
2243 #endif
2244     
2245   /* put a stop frame on the stack */
2246     tso->sp -= sizeofW(StgStopFrame);
2247     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2248     tso->link = END_TSO_QUEUE;
2249     
2250   // ToDo: check this
2251 #if defined(GRAN)
2252     /* uses more flexible routine in GranSim */
2253     insertThread(tso, CurrentProc);
2254 #else
2255     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2256      * from its creation
2257      */
2258 #endif
2259     
2260 #if defined(GRAN) 
2261     if (RtsFlags.GranFlags.GranSimStats.Full) 
2262         DumpGranEvent(GR_START,tso);
2263 #elif defined(PARALLEL_HASKELL)
2264     if (RtsFlags.ParFlags.ParStats.Full) 
2265         DumpGranEvent(GR_STARTQ,tso);
2266     /* HACk to avoid SCHEDULE 
2267        LastTSO = tso; */
2268 #endif
2269     
2270     /* Link the new thread on the global thread list.
2271      */
2272     ACQUIRE_LOCK(&sched_mutex);
2273     tso->id = next_thread_id++;  // while we have the mutex
2274     tso->global_link = all_threads;
2275     all_threads = tso;
2276     RELEASE_LOCK(&sched_mutex);
2277     
2278 #if defined(DIST)
2279     tso->dist.priority = MandatoryPriority; //by default that is...
2280 #endif
2281     
2282 #if defined(GRAN)
2283     tso->gran.pri = pri;
2284 # if defined(DEBUG)
2285     tso->gran.magic = TSO_MAGIC; // debugging only
2286 # endif
2287     tso->gran.sparkname   = 0;
2288     tso->gran.startedat   = CURRENT_TIME; 
2289     tso->gran.exported    = 0;
2290     tso->gran.basicblocks = 0;
2291     tso->gran.allocs      = 0;
2292     tso->gran.exectime    = 0;
2293     tso->gran.fetchtime   = 0;
2294     tso->gran.fetchcount  = 0;
2295     tso->gran.blocktime   = 0;
2296     tso->gran.blockcount  = 0;
2297     tso->gran.blockedat   = 0;
2298     tso->gran.globalsparks = 0;
2299     tso->gran.localsparks  = 0;
2300     if (RtsFlags.GranFlags.Light)
2301         tso->gran.clock  = Now; /* local clock */
2302     else
2303         tso->gran.clock  = 0;
2304     
2305     IF_DEBUG(gran,printTSO(tso));
2306 #elif defined(PARALLEL_HASKELL)
2307 # if defined(DEBUG)
2308     tso->par.magic = TSO_MAGIC; // debugging only
2309 # endif
2310     tso->par.sparkname   = 0;
2311     tso->par.startedat   = CURRENT_TIME; 
2312     tso->par.exported    = 0;
2313     tso->par.basicblocks = 0;
2314     tso->par.allocs      = 0;
2315     tso->par.exectime    = 0;
2316     tso->par.fetchtime   = 0;
2317     tso->par.fetchcount  = 0;
2318     tso->par.blocktime   = 0;
2319     tso->par.blockcount  = 0;
2320     tso->par.blockedat   = 0;
2321     tso->par.globalsparks = 0;
2322     tso->par.localsparks  = 0;
2323 #endif
2324     
2325 #if defined(GRAN)
2326     globalGranStats.tot_threads_created++;
2327     globalGranStats.threads_created_on_PE[CurrentProc]++;
2328     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2329     globalGranStats.tot_sq_probes++;
2330 #elif defined(PARALLEL_HASKELL)
2331     // collect parallel global statistics (currently done together with GC stats)
2332     if (RtsFlags.ParFlags.ParStats.Global &&
2333         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2334         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2335         globalParStats.tot_threads_created++;
2336     }
2337 #endif 
2338     
2339 #if defined(GRAN)
2340     IF_GRAN_DEBUG(pri,
2341                   sched_belch("==__ schedule: Created TSO %d (%p);",
2342                               CurrentProc, tso, tso->id));
2343 #elif defined(PARALLEL_HASKELL)
2344     IF_PAR_DEBUG(verbose,
2345                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2346                              (long)tso->id, tso, advisory_thread_count));
2347 #else
2348     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2349                                    (long)tso->id, (long)tso->stack_size));
2350 #endif    
2351     return tso;
2352 }
2353
2354 #if defined(PAR)
2355 /* RFP:
2356    all parallel thread creation calls should fall through the following routine.
2357 */
2358 StgTSO *
2359 createThreadFromSpark(rtsSpark spark) 
2360 { StgTSO *tso;
2361   ASSERT(spark != (rtsSpark)NULL);
2362 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2363   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2364   { threadsIgnored++;
2365     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2366           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2367     return END_TSO_QUEUE;
2368   }
2369   else
2370   { threadsCreated++;
2371     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2372     if (tso==END_TSO_QUEUE)     
2373       barf("createSparkThread: Cannot create TSO");
2374 #if defined(DIST)
2375     tso->priority = AdvisoryPriority;
2376 #endif
2377     pushClosure(tso,spark);
2378     addToRunQueue(tso);
2379     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2380   }
2381   return tso;
2382 }
2383 #endif
2384
2385 /*
2386   Turn a spark into a thread.
2387   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2388 */
2389 #if 0
2390 StgTSO *
2391 activateSpark (rtsSpark spark) 
2392 {
2393   StgTSO *tso;
2394
2395   tso = createSparkThread(spark);
2396   if (RtsFlags.ParFlags.ParStats.Full) {   
2397     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2398       IF_PAR_DEBUG(verbose,
2399                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2400                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2401   }
2402   // ToDo: fwd info on local/global spark to thread -- HWL
2403   // tso->gran.exported =  spark->exported;
2404   // tso->gran.locked =   !spark->global;
2405   // tso->gran.sparkname = spark->name;
2406
2407   return tso;
2408 }
2409 #endif
2410
2411 /* ---------------------------------------------------------------------------
2412  * scheduleThread()
2413  *
2414  * scheduleThread puts a thread on the end  of the runnable queue.
2415  * This will usually be done immediately after a thread is created.
2416  * The caller of scheduleThread must create the thread using e.g.
2417  * createThread and push an appropriate closure
2418  * on this thread's stack before the scheduler is invoked.
2419  * ------------------------------------------------------------------------ */
2420
2421 void
2422 scheduleThread(Capability *cap, StgTSO *tso)
2423 {
2424     // The thread goes at the *end* of the run-queue, to avoid possible
2425     // starvation of any threads already on the queue.
2426     appendToRunQueue(cap,tso);
2427 }
2428
2429 Capability *
2430 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2431 {
2432     Task *task;
2433
2434     // We already created/initialised the Task
2435     task = cap->running_task;
2436
2437     // This TSO is now a bound thread; make the Task and TSO
2438     // point to each other.
2439     tso->bound = task;
2440
2441     task->tso = tso;
2442     task->ret = ret;
2443     task->stat = NoStatus;
2444
2445     appendToRunQueue(cap,tso);
2446
2447     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2448
2449 #if defined(GRAN)
2450     /* GranSim specific init */
2451     CurrentTSO = m->tso;                // the TSO to run
2452     procStatus[MainProc] = Busy;        // status of main PE
2453     CurrentProc = MainProc;             // PE to run it on
2454 #endif
2455
2456     cap = schedule(cap,task);
2457
2458     ASSERT(task->stat != NoStatus);
2459
2460     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2461     return cap;
2462 }
2463
2464 /* ----------------------------------------------------------------------------
2465  * Starting Tasks
2466  * ------------------------------------------------------------------------- */
2467
2468 #if defined(THREADED_RTS)
2469 void
2470 workerStart(Task *task)
2471 {
2472     Capability *cap;
2473
2474     // See startWorkerTask().
2475     ACQUIRE_LOCK(&task->lock);
2476     cap = task->cap;
2477     RELEASE_LOCK(&task->lock);
2478
2479     // set the thread-local pointer to the Task:
2480     taskEnter(task);
2481
2482     // schedule() runs without a lock.
2483     cap = schedule(cap,task);
2484
2485     // On exit from schedule(), we have a Capability.
2486     releaseCapability(cap);
2487     taskStop(task);
2488 }
2489 #endif
2490
2491 /* ---------------------------------------------------------------------------
2492  * initScheduler()
2493  *
2494  * Initialise the scheduler.  This resets all the queues - if the
2495  * queues contained any threads, they'll be garbage collected at the
2496  * next pass.
2497  *
2498  * ------------------------------------------------------------------------ */
2499
2500 void 
2501 initScheduler(void)
2502 {
2503 #if defined(GRAN)
2504   nat i;
2505   for (i=0; i<=MAX_PROC; i++) {
2506     run_queue_hds[i]      = END_TSO_QUEUE;
2507     run_queue_tls[i]      = END_TSO_QUEUE;
2508     blocked_queue_hds[i]  = END_TSO_QUEUE;
2509     blocked_queue_tls[i]  = END_TSO_QUEUE;
2510     ccalling_threadss[i]  = END_TSO_QUEUE;
2511     blackhole_queue[i]    = END_TSO_QUEUE;
2512     sleeping_queue        = END_TSO_QUEUE;
2513   }
2514 #elif !defined(THREADED_RTS)
2515   blocked_queue_hd  = END_TSO_QUEUE;
2516   blocked_queue_tl  = END_TSO_QUEUE;
2517   sleeping_queue    = END_TSO_QUEUE;
2518 #endif
2519
2520   blackhole_queue   = END_TSO_QUEUE;
2521   all_threads       = END_TSO_QUEUE;
2522
2523   context_switch = 0;
2524   interrupted    = 0;
2525
2526   RtsFlags.ConcFlags.ctxtSwitchTicks =
2527       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2528       
2529 #if defined(THREADED_RTS)
2530   /* Initialise the mutex and condition variables used by
2531    * the scheduler. */
2532   initMutex(&sched_mutex);
2533 #endif
2534   
2535   ACQUIRE_LOCK(&sched_mutex);
2536
2537   /* A capability holds the state a native thread needs in
2538    * order to execute STG code. At least one capability is
2539    * floating around (only SMP builds have more than one).
2540    */
2541   initCapabilities();
2542
2543   initTaskManager();
2544
2545 #if defined(SMP)
2546   /*
2547    * Eagerly start one worker to run each Capability, except for
2548    * Capability 0.  The idea is that we're probably going to start a
2549    * bound thread on Capability 0 pretty soon, so we don't want a
2550    * worker task hogging it.
2551    */
2552   { 
2553       nat i;
2554       Capability *cap;
2555       for (i = 1; i < n_capabilities; i++) {
2556           cap = &capabilities[i];
2557           ACQUIRE_LOCK(&cap->lock);
2558           startWorkerTask(cap, workerStart);
2559           RELEASE_LOCK(&cap->lock);
2560       }
2561   }
2562 #endif
2563
2564 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2565   initSparkPools();
2566 #endif
2567
2568   RELEASE_LOCK(&sched_mutex);
2569 }
2570
2571 void
2572 exitScheduler( void )
2573 {
2574     interrupted = rtsTrue;
2575     shutting_down_scheduler = rtsTrue;
2576
2577 #if defined(THREADED_RTS)
2578     { 
2579         Task *task;
2580         nat i;
2581         
2582         ACQUIRE_LOCK(&sched_mutex);
2583         task = newBoundTask();
2584         RELEASE_LOCK(&sched_mutex);
2585
2586         for (i = 0; i < n_capabilities; i++) {
2587             shutdownCapability(&capabilities[i], task);
2588         }
2589         boundTaskExiting(task);
2590         stopTaskManager();
2591     }
2592 #endif
2593 }
2594
2595 /* ---------------------------------------------------------------------------
2596    Where are the roots that we know about?
2597
2598         - all the threads on the runnable queue
2599         - all the threads on the blocked queue
2600         - all the threads on the sleeping queue
2601         - all the thread currently executing a _ccall_GC
2602         - all the "main threads"
2603      
2604    ------------------------------------------------------------------------ */
2605
2606 /* This has to be protected either by the scheduler monitor, or by the
2607         garbage collection monitor (probably the latter).
2608         KH @ 25/10/99
2609 */
2610
2611 void
2612 GetRoots( evac_fn evac )
2613 {
2614     nat i;
2615     Capability *cap;
2616     Task *task;
2617
2618 #if defined(GRAN)
2619     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2620         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2621             evac((StgClosure **)&run_queue_hds[i]);
2622         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2623             evac((StgClosure **)&run_queue_tls[i]);
2624         
2625         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2626             evac((StgClosure **)&blocked_queue_hds[i]);
2627         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2628             evac((StgClosure **)&blocked_queue_tls[i]);
2629         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2630             evac((StgClosure **)&ccalling_threads[i]);
2631     }
2632
2633     markEventQueue();
2634
2635 #else /* !GRAN */
2636
2637     for (i = 0; i < n_capabilities; i++) {
2638         cap = &capabilities[i];
2639         evac((StgClosure **)&cap->run_queue_hd);
2640         evac((StgClosure **)&cap->run_queue_tl);
2641         
2642         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2643              task=task->next) {
2644             evac((StgClosure **)&task->suspended_tso);
2645         }
2646     }
2647     
2648 #if !defined(THREADED_RTS)
2649     evac((StgClosure **)&blocked_queue_hd);
2650     evac((StgClosure **)&blocked_queue_tl);
2651     evac((StgClosure **)&sleeping_queue);
2652 #endif 
2653 #endif
2654
2655     evac((StgClosure **)&blackhole_queue);
2656
2657 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2658     markSparkQueue(evac);
2659 #endif
2660     
2661 #if defined(RTS_USER_SIGNALS)
2662     // mark the signal handlers (signals should be already blocked)
2663     markSignalHandlers(evac);
2664 #endif
2665 }
2666
2667 /* -----------------------------------------------------------------------------
2668    performGC
2669
2670    This is the interface to the garbage collector from Haskell land.
2671    We provide this so that external C code can allocate and garbage
2672    collect when called from Haskell via _ccall_GC.
2673
2674    It might be useful to provide an interface whereby the programmer
2675    can specify more roots (ToDo).
2676    
2677    This needs to be protected by the GC condition variable above.  KH.
2678    -------------------------------------------------------------------------- */
2679
2680 static void (*extra_roots)(evac_fn);
2681
2682 void
2683 performGC(void)
2684 {
2685 #ifdef THREADED_RTS
2686     // ToDo: we have to grab all the capabilities here.
2687     errorBelch("performGC not supported in threaded RTS (yet)");
2688     stg_exit(EXIT_FAILURE);
2689 #endif
2690     /* Obligated to hold this lock upon entry */
2691     GarbageCollect(GetRoots,rtsFalse);
2692 }
2693
2694 void
2695 performMajorGC(void)
2696 {
2697 #ifdef THREADED_RTS
2698     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2699     stg_exit(EXIT_FAILURE);
2700 #endif
2701     GarbageCollect(GetRoots,rtsTrue);
2702 }
2703
2704 static void
2705 AllRoots(evac_fn evac)
2706 {
2707     GetRoots(evac);             // the scheduler's roots
2708     extra_roots(evac);          // the user's roots
2709 }
2710
2711 void
2712 performGCWithRoots(void (*get_roots)(evac_fn))
2713 {
2714 #ifdef THREADED_RTS
2715     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2716     stg_exit(EXIT_FAILURE);
2717 #endif
2718     extra_roots = get_roots;
2719     GarbageCollect(AllRoots,rtsFalse);
2720 }
2721
2722 /* -----------------------------------------------------------------------------
2723    Stack overflow
2724
2725    If the thread has reached its maximum stack size, then raise the
2726    StackOverflow exception in the offending thread.  Otherwise
2727    relocate the TSO into a larger chunk of memory and adjust its stack
2728    size appropriately.
2729    -------------------------------------------------------------------------- */
2730
2731 static StgTSO *
2732 threadStackOverflow(Capability *cap, StgTSO *tso)
2733 {
2734   nat new_stack_size, stack_words;
2735   lnat new_tso_size;
2736   StgPtr new_sp;
2737   StgTSO *dest;
2738
2739   IF_DEBUG(sanity,checkTSO(tso));
2740   if (tso->stack_size >= tso->max_stack_size) {
2741
2742     IF_DEBUG(gc,
2743              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2744                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2745              /* If we're debugging, just print out the top of the stack */
2746              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2747                                               tso->sp+64)));
2748
2749     /* Send this thread the StackOverflow exception */
2750     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2751     return tso;
2752   }
2753
2754   /* Try to double the current stack size.  If that takes us over the
2755    * maximum stack size for this thread, then use the maximum instead.
2756    * Finally round up so the TSO ends up as a whole number of blocks.
2757    */
2758   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2759   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2760                                        TSO_STRUCT_SIZE)/sizeof(W_);
2761   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2762   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2763
2764   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
2765
2766   dest = (StgTSO *)allocate(new_tso_size);
2767   TICK_ALLOC_TSO(new_stack_size,0);
2768
2769   /* copy the TSO block and the old stack into the new area */
2770   memcpy(dest,tso,TSO_STRUCT_SIZE);
2771   stack_words = tso->stack + tso->stack_size - tso->sp;
2772   new_sp = (P_)dest + new_tso_size - stack_words;
2773   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2774
2775   /* relocate the stack pointers... */
2776   dest->sp         = new_sp;
2777   dest->stack_size = new_stack_size;
2778         
2779   /* Mark the old TSO as relocated.  We have to check for relocated
2780    * TSOs in the garbage collector and any primops that deal with TSOs.
2781    *
2782    * It's important to set the sp value to just beyond the end
2783    * of the stack, so we don't attempt to scavenge any part of the
2784    * dead TSO's stack.
2785    */
2786   tso->what_next = ThreadRelocated;
2787   tso->link = dest;
2788   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2789   tso->why_blocked = NotBlocked;
2790
2791   IF_PAR_DEBUG(verbose,
2792                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2793                      tso->id, tso, tso->stack_size);
2794                /* If we're debugging, just print out the top of the stack */
2795                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2796                                                 tso->sp+64)));
2797   
2798   IF_DEBUG(sanity,checkTSO(tso));
2799 #if 0
2800   IF_DEBUG(scheduler,printTSO(dest));
2801 #endif
2802
2803   return dest;
2804 }
2805
2806 /* ---------------------------------------------------------------------------
2807    Wake up a queue that was blocked on some resource.
2808    ------------------------------------------------------------------------ */
2809
2810 #if defined(GRAN)
2811 STATIC_INLINE void
2812 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2813 {
2814 }
2815 #elif defined(PARALLEL_HASKELL)
2816 STATIC_INLINE void
2817 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2818 {
2819   /* write RESUME events to log file and
2820      update blocked and fetch time (depending on type of the orig closure) */
2821   if (RtsFlags.ParFlags.ParStats.Full) {
2822     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2823                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2824                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2825     if (emptyRunQueue())
2826       emitSchedule = rtsTrue;
2827
2828     switch (get_itbl(node)->type) {
2829         case FETCH_ME_BQ:
2830           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2831           break;
2832         case RBH:
2833         case FETCH_ME:
2834         case BLACKHOLE_BQ:
2835           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2836           break;
2837 #ifdef DIST
2838         case MVAR:
2839           break;
2840 #endif    
2841         default:
2842           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2843         }
2844       }
2845 }
2846 #endif
2847
2848 #if defined(GRAN)
2849 StgBlockingQueueElement *
2850 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2851 {
2852     StgTSO *tso;
2853     PEs node_loc, tso_loc;
2854
2855     node_loc = where_is(node); // should be lifted out of loop
2856     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2857     tso_loc = where_is((StgClosure *)tso);
2858     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2859       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2860       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2861       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2862       // insertThread(tso, node_loc);
2863       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2864                 ResumeThread,
2865                 tso, node, (rtsSpark*)NULL);
2866       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2867       // len_local++;
2868       // len++;
2869     } else { // TSO is remote (actually should be FMBQ)
2870       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2871                                   RtsFlags.GranFlags.Costs.gunblocktime +
2872                                   RtsFlags.GranFlags.Costs.latency;
2873       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2874                 UnblockThread,
2875                 tso, node, (rtsSpark*)NULL);
2876       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2877       // len++;
2878     }
2879     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2880     IF_GRAN_DEBUG(bq,
2881                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2882                           (node_loc==tso_loc ? "Local" : "Global"), 
2883                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2884     tso->block_info.closure = NULL;
2885     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2886                              tso->id, tso));
2887 }
2888 #elif defined(PARALLEL_HASKELL)
2889 StgBlockingQueueElement *
2890 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2891 {
2892     StgBlockingQueueElement *next;
2893
2894     switch (get_itbl(bqe)->type) {
2895     case TSO:
2896       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2897       /* if it's a TSO just push it onto the run_queue */
2898       next = bqe->link;
2899       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2900       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2901       threadRunnable();
2902       unblockCount(bqe, node);
2903       /* reset blocking status after dumping event */
2904       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2905       break;
2906
2907     case BLOCKED_FETCH:
2908       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2909       next = bqe->link;
2910       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2911       PendingFetches = (StgBlockedFetch *)bqe;
2912       break;
2913
2914 # if defined(DEBUG)
2915       /* can ignore this case in a non-debugging setup; 
2916          see comments on RBHSave closures above */
2917     case CONSTR:
2918       /* check that the closure is an RBHSave closure */
2919       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2920              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2921              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2922       break;
2923
2924     default:
2925       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2926            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2927            (StgClosure *)bqe);
2928 # endif
2929     }
2930   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2931   return next;
2932 }
2933 #endif
2934
2935 StgTSO *
2936 unblockOne(Capability *cap, StgTSO *tso)
2937 {
2938   StgTSO *next;
2939
2940   ASSERT(get_itbl(tso)->type == TSO);
2941   ASSERT(tso->why_blocked != NotBlocked);
2942   tso->why_blocked = NotBlocked;
2943   next = tso->link;
2944   tso->link = END_TSO_QUEUE;
2945
2946   // We might have just migrated this TSO to our Capability:
2947   if (tso->bound) {
2948       tso->bound->cap = cap;
2949   }
2950
2951   appendToRunQueue(cap,tso);
2952
2953   // we're holding a newly woken thread, make sure we context switch
2954   // quickly so we can migrate it if necessary.
2955   context_switch = 1;
2956   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2957   return next;
2958 }
2959
2960
2961 #if defined(GRAN)
2962 void 
2963 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2964 {
2965   StgBlockingQueueElement *bqe;
2966   PEs node_loc;
2967   nat len = 0; 
2968
2969   IF_GRAN_DEBUG(bq, 
2970                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2971                       node, CurrentProc, CurrentTime[CurrentProc], 
2972                       CurrentTSO->id, CurrentTSO));
2973
2974   node_loc = where_is(node);
2975
2976   ASSERT(q == END_BQ_QUEUE ||
2977          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2978          get_itbl(q)->type == CONSTR); // closure (type constructor)
2979   ASSERT(is_unique(node));
2980
2981   /* FAKE FETCH: magically copy the node to the tso's proc;
2982      no Fetch necessary because in reality the node should not have been 
2983      moved to the other PE in the first place
2984   */
2985   if (CurrentProc!=node_loc) {
2986     IF_GRAN_DEBUG(bq, 
2987                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2988                         node, node_loc, CurrentProc, CurrentTSO->id, 
2989                         // CurrentTSO, where_is(CurrentTSO),
2990                         node->header.gran.procs));
2991     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2992     IF_GRAN_DEBUG(bq, 
2993                   debugBelch("## new bitmask of node %p is %#x\n",
2994                         node, node->header.gran.procs));
2995     if (RtsFlags.GranFlags.GranSimStats.Global) {
2996       globalGranStats.tot_fake_fetches++;
2997     }
2998   }
2999
3000   bqe = q;
3001   // ToDo: check: ASSERT(CurrentProc==node_loc);
3002   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3003     //next = bqe->link;
3004     /* 
3005        bqe points to the current element in the queue
3006        next points to the next element in the queue
3007     */
3008     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3009     //tso_loc = where_is(tso);
3010     len++;
3011     bqe = unblockOne(bqe, node);
3012   }
3013
3014   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3015      the closure to make room for the anchor of the BQ */
3016   if (bqe!=END_BQ_QUEUE) {
3017     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3018     /*
3019     ASSERT((info_ptr==&RBH_Save_0_info) ||
3020            (info_ptr==&RBH_Save_1_info) ||
3021            (info_ptr==&RBH_Save_2_info));
3022     */
3023     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3024     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3025     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3026
3027     IF_GRAN_DEBUG(bq,
3028                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3029                         node, info_type(node)));
3030   }
3031
3032   /* statistics gathering */
3033   if (RtsFlags.GranFlags.GranSimStats.Global) {
3034     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3035     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3036     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3037     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3038   }
3039   IF_GRAN_DEBUG(bq,
3040                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3041                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3042 }
3043 #elif defined(PARALLEL_HASKELL)
3044 void 
3045 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3046 {
3047   StgBlockingQueueElement *bqe;
3048
3049   IF_PAR_DEBUG(verbose, 
3050                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3051                      node, mytid));
3052 #ifdef DIST  
3053   //RFP
3054   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3055     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3056     return;
3057   }
3058 #endif
3059   
3060   ASSERT(q == END_BQ_QUEUE ||
3061          get_itbl(q)->type == TSO ||           
3062          get_itbl(q)->type == BLOCKED_FETCH || 
3063          get_itbl(q)->type == CONSTR); 
3064
3065   bqe = q;
3066   while (get_itbl(bqe)->type==TSO || 
3067          get_itbl(bqe)->type==BLOCKED_FETCH) {
3068     bqe = unblockOne(bqe, node);
3069   }
3070 }
3071
3072 #else   /* !GRAN && !PARALLEL_HASKELL */
3073
3074 void
3075 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3076 {
3077     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3078                              // Exception.cmm
3079     while (tso != END_TSO_QUEUE) {
3080         tso = unblockOne(cap,tso);
3081     }
3082 }
3083 #endif
3084
3085 /* ---------------------------------------------------------------------------
3086    Interrupt execution
3087    - usually called inside a signal handler so it mustn't do anything fancy.   
3088    ------------------------------------------------------------------------ */
3089
3090 void
3091 interruptStgRts(void)
3092 {
3093     interrupted    = 1;
3094     context_switch = 1;
3095 #if defined(THREADED_RTS)
3096     prodAllCapabilities();
3097 #endif
3098 }
3099
3100 /* -----------------------------------------------------------------------------
3101    Unblock a thread
3102
3103    This is for use when we raise an exception in another thread, which
3104    may be blocked.
3105    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3106    -------------------------------------------------------------------------- */
3107
3108 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3109 /*
3110   NB: only the type of the blocking queue is different in GranSim and GUM
3111       the operations on the queue-elements are the same
3112       long live polymorphism!
3113
3114   Locks: sched_mutex is held upon entry and exit.
3115
3116 */
3117 static void
3118 unblockThread(Capability *cap, StgTSO *tso)
3119 {
3120   StgBlockingQueueElement *t, **last;
3121
3122   switch (tso->why_blocked) {
3123
3124   case NotBlocked:
3125     return;  /* not blocked */
3126
3127   case BlockedOnSTM:
3128     // Be careful: nothing to do here!  We tell the scheduler that the thread
3129     // is runnable and we leave it to the stack-walking code to abort the 
3130     // transaction while unwinding the stack.  We should perhaps have a debugging
3131     // test to make sure that this really happens and that the 'zombie' transaction
3132     // does not get committed.
3133     goto done;
3134
3135   case BlockedOnMVar:
3136     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3137     {
3138       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3139       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3140
3141       last = (StgBlockingQueueElement **)&mvar->head;
3142       for (t = (StgBlockingQueueElement *)mvar->head; 
3143            t != END_BQ_QUEUE; 
3144            last = &t->link, last_tso = t, t = t->link) {
3145         if (t == (StgBlockingQueueElement *)tso) {
3146           *last = (StgBlockingQueueElement *)tso->link;
3147           if (mvar->tail == tso) {
3148             mvar->tail = (StgTSO *)last_tso;
3149           }
3150           goto done;
3151         }
3152       }
3153       barf("unblockThread (MVAR): TSO not found");
3154     }
3155
3156   case BlockedOnBlackHole:
3157     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3158     {
3159       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3160
3161       last = &bq->blocking_queue;
3162       for (t = bq->blocking_queue; 
3163            t != END_BQ_QUEUE; 
3164            last = &t->link, t = t->link) {
3165         if (t == (StgBlockingQueueElement *)tso) {
3166           *last = (StgBlockingQueueElement *)tso->link;
3167           goto done;
3168         }
3169       }
3170       barf("unblockThread (BLACKHOLE): TSO not found");
3171     }
3172
3173   case BlockedOnException:
3174     {
3175       StgTSO *target  = tso->block_info.tso;
3176
3177       ASSERT(get_itbl(target)->type == TSO);
3178
3179       if (target->what_next == ThreadRelocated) {
3180           target = target->link;
3181           ASSERT(get_itbl(target)->type == TSO);
3182       }
3183
3184       ASSERT(target->blocked_exceptions != NULL);
3185
3186       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3187       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3188            t != END_BQ_QUEUE; 
3189            last = &t->link, t = t->link) {
3190         ASSERT(get_itbl(t)->type == TSO);
3191         if (t == (StgBlockingQueueElement *)tso) {
3192           *last = (StgBlockingQueueElement *)tso->link;
3193           goto done;
3194         }
3195       }
3196       barf("unblockThread (Exception): TSO not found");
3197     }
3198
3199   case BlockedOnRead:
3200   case BlockedOnWrite:
3201 #if defined(mingw32_HOST_OS)
3202   case BlockedOnDoProc:
3203 #endif
3204     {
3205       /* take TSO off blocked_queue */
3206       StgBlockingQueueElement *prev = NULL;
3207       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3208            prev = t, t = t->link) {
3209         if (t == (StgBlockingQueueElement *)tso) {
3210           if (prev == NULL) {
3211             blocked_queue_hd = (StgTSO *)t->link;
3212             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3213               blocked_queue_tl = END_TSO_QUEUE;
3214             }
3215           } else {
3216             prev->link = t->link;
3217             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3218               blocked_queue_tl = (StgTSO *)prev;
3219             }
3220           }
3221 #if defined(mingw32_HOST_OS)
3222           /* (Cooperatively) signal that the worker thread should abort
3223            * the request.
3224            */
3225           abandonWorkRequest(tso->block_info.async_result->reqID);
3226 #endif
3227           goto done;
3228         }
3229       }
3230       barf("unblockThread (I/O): TSO not found");
3231     }
3232
3233   case BlockedOnDelay:
3234     {
3235       /* take TSO off sleeping_queue */
3236       StgBlockingQueueElement *prev = NULL;
3237       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3238            prev = t, t = t->link) {
3239         if (t == (StgBlockingQueueElement *)tso) {
3240           if (prev == NULL) {
3241             sleeping_queue = (StgTSO *)t->link;
3242           } else {
3243             prev->link = t->link;
3244           }
3245           goto done;
3246         }
3247       }
3248       barf("unblockThread (delay): TSO not found");
3249     }
3250
3251   default:
3252     barf("unblockThread");
3253   }
3254
3255  done:
3256   tso->link = END_TSO_QUEUE;
3257   tso->why_blocked = NotBlocked;
3258   tso->block_info.closure = NULL;
3259   pushOnRunQueue(cap,tso);
3260 }
3261 #else
3262 static void
3263 unblockThread(Capability *cap, StgTSO *tso)
3264 {
3265   StgTSO *t, **last;
3266   
3267   /* To avoid locking unnecessarily. */
3268   if (tso->why_blocked == NotBlocked) {
3269     return;
3270   }
3271
3272   switch (tso->why_blocked) {
3273
3274   case BlockedOnSTM:
3275     // Be careful: nothing to do here!  We tell the scheduler that the thread
3276     // is runnable and we leave it to the stack-walking code to abort the 
3277     // transaction while unwinding the stack.  We should perhaps have a debugging
3278     // test to make sure that this really happens and that the 'zombie' transaction
3279     // does not get committed.
3280     goto done;
3281
3282   case BlockedOnMVar:
3283     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3284     {
3285       StgTSO *last_tso = END_TSO_QUEUE;
3286       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3287
3288       last = &mvar->head;
3289       for (t = mvar->head; t != END_TSO_QUEUE; 
3290            last = &t->link, last_tso = t, t = t->link) {
3291         if (t == tso) {
3292           *last = tso->link;
3293           if (mvar->tail == tso) {
3294             mvar->tail = last_tso;
3295           }
3296           goto done;
3297         }
3298       }
3299       barf("unblockThread (MVAR): TSO not found");
3300     }
3301
3302   case BlockedOnBlackHole:
3303     {
3304       last = &blackhole_queue;
3305       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3306            last = &t->link, t = t->link) {
3307         if (t == tso) {
3308           *last = tso->link;
3309           goto done;
3310         }
3311       }
3312       barf("unblockThread (BLACKHOLE): TSO not found");
3313     }
3314
3315   case BlockedOnException:
3316     {
3317       StgTSO *target  = tso->block_info.tso;
3318
3319       ASSERT(get_itbl(target)->type == TSO);
3320
3321       while (target->what_next == ThreadRelocated) {
3322           target = target->link;
3323           ASSERT(get_itbl(target)->type == TSO);
3324       }
3325       
3326       ASSERT(target->blocked_exceptions != NULL);
3327
3328       last = &target->blocked_exceptions;
3329       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3330            last = &t->link, t = t->link) {
3331         ASSERT(get_itbl(t)->type == TSO);
3332         if (t == tso) {
3333           *last = tso->link;
3334           goto done;
3335         }
3336       }
3337       barf("unblockThread (Exception): TSO not found");
3338     }
3339
3340 #if !defined(THREADED_RTS)
3341   case BlockedOnRead:
3342   case BlockedOnWrite:
3343 #if defined(mingw32_HOST_OS)
3344   case BlockedOnDoProc:
3345 #endif
3346     {
3347       StgTSO *prev = NULL;
3348       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3349            prev = t, t = t->link) {
3350         if (t == tso) {
3351           if (prev == NULL) {
3352             blocked_queue_hd = t->link;
3353             if (blocked_queue_tl == t) {
3354               blocked_queue_tl = END_TSO_QUEUE;
3355             }
3356           } else {
3357             prev->link = t->link;
3358             if (blocked_queue_tl == t) {
3359               blocked_queue_tl = prev;
3360             }
3361           }
3362 #if defined(mingw32_HOST_OS)
3363           /* (Cooperatively) signal that the worker thread should abort
3364            * the request.
3365            */
3366           abandonWorkRequest(tso->block_info.async_result->reqID);
3367 #endif
3368           goto done;
3369         }
3370       }
3371       barf("unblockThread (I/O): TSO not found");
3372     }
3373
3374   case BlockedOnDelay:
3375     {
3376       StgTSO *prev = NULL;
3377       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3378            prev = t, t = t->link) {
3379         if (t == tso) {
3380           if (prev == NULL) {
3381             sleeping_queue = t->link;
3382           } else {
3383             prev->link = t->link;
3384           }
3385           goto done;
3386         }
3387       }
3388       barf("unblockThread (delay): TSO not found");
3389     }
3390 #endif
3391
3392   default:
3393     barf("unblockThread");
3394   }
3395
3396  done:
3397   tso->link = END_TSO_QUEUE;
3398   tso->why_blocked = NotBlocked;
3399   tso->block_info.closure = NULL;
3400   appendToRunQueue(cap,tso);
3401 }
3402 #endif
3403
3404 /* -----------------------------------------------------------------------------
3405  * checkBlackHoles()
3406  *
3407  * Check the blackhole_queue for threads that can be woken up.  We do
3408  * this periodically: before every GC, and whenever the run queue is
3409  * empty.
3410  *
3411  * An elegant solution might be to just wake up all the blocked
3412  * threads with awakenBlockedQueue occasionally: they'll go back to
3413  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3414  * doesn't give us a way to tell whether we've actually managed to
3415  * wake up any threads, so we would be busy-waiting.
3416  *
3417  * -------------------------------------------------------------------------- */
3418
3419 static rtsBool
3420 checkBlackHoles (Capability *cap)
3421 {
3422     StgTSO **prev, *t;
3423     rtsBool any_woke_up = rtsFalse;
3424     StgHalfWord type;
3425
3426     // blackhole_queue is global:
3427     ASSERT_LOCK_HELD(&sched_mutex);
3428
3429     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3430
3431     // ASSUMES: sched_mutex
3432     prev = &blackhole_queue;
3433     t = blackhole_queue;
3434     while (t != END_TSO_QUEUE) {
3435         ASSERT(t->why_blocked == BlockedOnBlackHole);
3436         type = get_itbl(t->block_info.closure)->type;
3437         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3438             IF_DEBUG(sanity,checkTSO(t));
3439             t = unblockOne(cap, t);
3440             // urk, the threads migrate to the current capability
3441             // here, but we'd like to keep them on the original one.
3442             *prev = t;
3443             any_woke_up = rtsTrue;
3444         } else {
3445             prev = &t->link;
3446             t = t->link;
3447         }
3448     }
3449
3450     return any_woke_up;
3451 }
3452
3453 /* -----------------------------------------------------------------------------
3454  * raiseAsync()
3455  *
3456  * The following function implements the magic for raising an
3457  * asynchronous exception in an existing thread.
3458  *
3459  * We first remove the thread from any queue on which it might be
3460  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3461  *
3462  * We strip the stack down to the innermost CATCH_FRAME, building
3463  * thunks in the heap for all the active computations, so they can 
3464  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3465  * an application of the handler to the exception, and push it on
3466  * the top of the stack.
3467  * 
3468  * How exactly do we save all the active computations?  We create an
3469  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3470  * AP_STACKs pushes everything from the corresponding update frame
3471  * upwards onto the stack.  (Actually, it pushes everything up to the
3472  * next update frame plus a pointer to the next AP_STACK object.
3473  * Entering the next AP_STACK object pushes more onto the stack until we
3474  * reach the last AP_STACK object - at which point the stack should look
3475  * exactly as it did when we killed the TSO and we can continue
3476  * execution by entering the closure on top of the stack.
3477  *
3478  * We can also kill a thread entirely - this happens if either (a) the 
3479  * exception passed to raiseAsync is NULL, or (b) there's no
3480  * CATCH_FRAME on the stack.  In either case, we strip the entire
3481  * stack and replace the thread with a zombie.
3482  *
3483  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3484  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3485  * the TSO is currently blocked on or on the run queue of.
3486  *
3487  * -------------------------------------------------------------------------- */
3488  
3489 void
3490 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3491 {
3492     raiseAsync_(cap, tso, exception, rtsFalse);
3493 }
3494
3495 static void
3496 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3497             rtsBool stop_at_atomically)
3498 {
3499     StgRetInfoTable *info;
3500     StgPtr sp;
3501   
3502     // Thread already dead?
3503     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3504         return;
3505     }
3506
3507     IF_DEBUG(scheduler, 
3508              sched_belch("raising exception in thread %ld.", (long)tso->id));
3509     
3510     // Remove it from any blocking queues
3511     unblockThread(cap,tso);
3512
3513     sp = tso->sp;
3514     
3515     // The stack freezing code assumes there's a closure pointer on
3516     // the top of the stack, so we have to arrange that this is the case...
3517     //
3518     if (sp[0] == (W_)&stg_enter_info) {
3519         sp++;
3520     } else {
3521         sp--;
3522         sp[0] = (W_)&stg_dummy_ret_closure;
3523     }
3524
3525     while (1) {
3526         nat i;
3527
3528         // 1. Let the top of the stack be the "current closure"
3529         //
3530         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3531         // CATCH_FRAME.
3532         //
3533         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3534         // current closure applied to the chunk of stack up to (but not
3535         // including) the update frame.  This closure becomes the "current
3536         // closure".  Go back to step 2.
3537         //
3538         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3539         // top of the stack applied to the exception.
3540         // 
3541         // 5. If it's a STOP_FRAME, then kill the thread.
3542         // 
3543         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3544         // transaction
3545        
3546         
3547         StgPtr frame;
3548         
3549         frame = sp + 1;
3550         info = get_ret_itbl((StgClosure *)frame);
3551         
3552         while (info->i.type != UPDATE_FRAME
3553                && (info->i.type != CATCH_FRAME || exception == NULL)
3554                && info->i.type != STOP_FRAME
3555                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3556         {
3557             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3558               // IF we find an ATOMICALLY_FRAME then we abort the
3559               // current transaction and propagate the exception.  In
3560               // this case (unlike ordinary exceptions) we do not care
3561               // whether the transaction is valid or not because its
3562               // possible validity cannot have caused the exception
3563               // and will not be visible after the abort.
3564               IF_DEBUG(stm,
3565                        debugBelch("Found atomically block delivering async exception\n"));
3566               stmAbortTransaction(tso -> trec);
3567               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3568             }
3569             frame += stack_frame_sizeW((StgClosure *)frame);
3570             info = get_ret_itbl((StgClosure *)frame);
3571         }
3572         
3573         switch (info->i.type) {
3574             
3575         case ATOMICALLY_FRAME:
3576             ASSERT(stop_at_atomically);
3577             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3578             stmCondemnTransaction(tso -> trec);
3579 #ifdef REG_R1
3580             tso->sp = frame;
3581 #else
3582             // R1 is not a register: the return convention for IO in
3583             // this case puts the return value on the stack, so we
3584             // need to set up the stack to return to the atomically
3585             // frame properly...
3586             tso->sp = frame - 2;
3587             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3588             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3589 #endif
3590             tso->what_next = ThreadRunGHC;
3591             return;
3592
3593         case CATCH_FRAME:
3594             // If we find a CATCH_FRAME, and we've got an exception to raise,
3595             // then build the THUNK raise(exception), and leave it on
3596             // top of the CATCH_FRAME ready to enter.
3597             //
3598         {
3599 #ifdef PROFILING
3600             StgCatchFrame *cf = (StgCatchFrame *)frame;
3601 #endif
3602             StgThunk *raise;
3603             
3604             // we've got an exception to raise, so let's pass it to the
3605             // handler in this frame.
3606             //
3607             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3608             TICK_ALLOC_SE_THK(1,0);
3609             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3610             raise->payload[0] = exception;
3611             
3612             // throw away the stack from Sp up to the CATCH_FRAME.
3613             //
3614             sp = frame - 1;
3615             
3616             /* Ensure that async excpetions are blocked now, so we don't get
3617              * a surprise exception before we get around to executing the
3618              * handler.
3619              */
3620             if (tso->blocked_exceptions == NULL) {
3621                 tso->blocked_exceptions = END_TSO_QUEUE;
3622             }
3623             
3624             /* Put the newly-built THUNK on top of the stack, ready to execute
3625              * when the thread restarts.
3626              */
3627             sp[0] = (W_)raise;
3628             sp[-1] = (W_)&stg_enter_info;
3629             tso->sp = sp-1;
3630             tso->what_next = ThreadRunGHC;
3631             IF_DEBUG(sanity, checkTSO(tso));
3632             return;
3633         }
3634         
3635         case UPDATE_FRAME:
3636         {
3637             StgAP_STACK * ap;
3638             nat words;
3639             
3640             // First build an AP_STACK consisting of the stack chunk above the
3641             // current update frame, with the top word on the stack as the
3642             // fun field.
3643             //
3644             words = frame - sp - 1;
3645             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3646             
3647             ap->size = words;
3648             ap->fun  = (StgClosure *)sp[0];
3649             sp++;
3650             for(i=0; i < (nat)words; ++i) {
3651                 ap->payload[i] = (StgClosure *)*sp++;
3652             }
3653             
3654             SET_HDR(ap,&stg_AP_STACK_info,
3655                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3656             TICK_ALLOC_UP_THK(words+1,0);
3657             
3658             IF_DEBUG(scheduler,
3659                      debugBelch("sched: Updating ");
3660                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3661                      debugBelch(" with ");
3662                      printObj((StgClosure *)ap);
3663                 );
3664
3665             // Replace the updatee with an indirection - happily
3666             // this will also wake up any threads currently
3667             // waiting on the result.
3668             //
3669             // Warning: if we're in a loop, more than one update frame on
3670             // the stack may point to the same object.  Be careful not to
3671             // overwrite an IND_OLDGEN in this case, because we'll screw
3672             // up the mutable lists.  To be on the safe side, don't
3673             // overwrite any kind of indirection at all.  See also
3674             // threadSqueezeStack in GC.c, where we have to make a similar
3675             // check.
3676             //
3677             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3678                 // revert the black hole
3679                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3680                                (StgClosure *)ap);
3681             }
3682             sp += sizeofW(StgUpdateFrame) - 1;
3683             sp[0] = (W_)ap; // push onto stack
3684             break;
3685         }
3686         
3687         case STOP_FRAME:
3688             // We've stripped the entire stack, the thread is now dead.
3689             sp += sizeofW(StgStopFrame);
3690             tso->what_next = ThreadKilled;
3691             tso->sp = sp;
3692             return;
3693             
3694         default:
3695             barf("raiseAsync");
3696         }
3697     }
3698     barf("raiseAsync");
3699 }
3700
3701 /* -----------------------------------------------------------------------------
3702    Deleting threads
3703
3704    This is used for interruption (^C) and forking, and corresponds to
3705    raising an exception but without letting the thread catch the
3706    exception.
3707    -------------------------------------------------------------------------- */
3708
3709 static void 
3710 deleteThread (Capability *cap, StgTSO *tso)
3711 {
3712   if (tso->why_blocked != BlockedOnCCall &&
3713       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3714       raiseAsync(cap,tso,NULL);
3715   }
3716 }
3717
3718 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3719 static void 
3720 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3721 { // for forkProcess only:
3722   // delete thread without giving it a chance to catch the KillThread exception
3723
3724   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3725       return;
3726   }
3727
3728   if (tso->why_blocked != BlockedOnCCall &&
3729       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3730       unblockThread(cap,tso);
3731   }
3732
3733   tso->what_next = ThreadKilled;
3734 }
3735 #endif
3736
3737 /* -----------------------------------------------------------------------------
3738    raiseExceptionHelper
3739    
3740    This function is called by the raise# primitve, just so that we can
3741    move some of the tricky bits of raising an exception from C-- into
3742    C.  Who knows, it might be a useful re-useable thing here too.
3743    -------------------------------------------------------------------------- */
3744
3745 StgWord
3746 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3747 {
3748     Capability *cap = regTableToCapability(reg);
3749     StgThunk *raise_closure = NULL;
3750     StgPtr p, next;
3751     StgRetInfoTable *info;
3752     //
3753     // This closure represents the expression 'raise# E' where E
3754     // is the exception raise.  It is used to overwrite all the
3755     // thunks which are currently under evaluataion.
3756     //
3757
3758     //    
3759     // LDV profiling: stg_raise_info has THUNK as its closure
3760     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3761     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3762     // 1 does not cause any problem unless profiling is performed.
3763     // However, when LDV profiling goes on, we need to linearly scan
3764     // small object pool, where raise_closure is stored, so we should
3765     // use MIN_UPD_SIZE.
3766     //
3767     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3768     //                                 sizeofW(StgClosure)+1);
3769     //
3770
3771     //
3772     // Walk up the stack, looking for the catch frame.  On the way,
3773     // we update any closures pointed to from update frames with the
3774     // raise closure that we just built.
3775     //
3776     p = tso->sp;
3777     while(1) {
3778         info = get_ret_itbl((StgClosure *)p);
3779         next = p + stack_frame_sizeW((StgClosure *)p);
3780         switch (info->i.type) {
3781             
3782         case UPDATE_FRAME:
3783             // Only create raise_closure if we need to.
3784             if (raise_closure == NULL) {
3785                 raise_closure = 
3786                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3787                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3788                 raise_closure->payload[0] = exception;
3789             }
3790             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3791             p = next;
3792             continue;
3793
3794         case ATOMICALLY_FRAME:
3795             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3796             tso->sp = p;
3797             return ATOMICALLY_FRAME;
3798             
3799         case CATCH_FRAME:
3800             tso->sp = p;
3801             return CATCH_FRAME;
3802
3803         case CATCH_STM_FRAME:
3804             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3805             tso->sp = p;
3806             return CATCH_STM_FRAME;
3807             
3808         case STOP_FRAME:
3809             tso->sp = p;
3810             return STOP_FRAME;
3811
3812         case CATCH_RETRY_FRAME:
3813         default:
3814             p = next; 
3815             continue;
3816         }
3817     }
3818 }
3819
3820
3821 /* -----------------------------------------------------------------------------
3822    findRetryFrameHelper
3823
3824    This function is called by the retry# primitive.  It traverses the stack
3825    leaving tso->sp referring to the frame which should handle the retry.  
3826
3827    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3828    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3829
3830    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3831    despite the similar implementation.
3832
3833    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3834    not be created within memory transactions.
3835    -------------------------------------------------------------------------- */
3836
3837 StgWord
3838 findRetryFrameHelper (StgTSO *tso)
3839 {
3840   StgPtr           p, next;
3841   StgRetInfoTable *info;
3842
3843   p = tso -> sp;
3844   while (1) {
3845     info = get_ret_itbl((StgClosure *)p);
3846     next = p + stack_frame_sizeW((StgClosure *)p);
3847     switch (info->i.type) {
3848       
3849     case ATOMICALLY_FRAME:
3850       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3851       tso->sp = p;
3852       return ATOMICALLY_FRAME;
3853       
3854     case CATCH_RETRY_FRAME:
3855       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3856       tso->sp = p;
3857       return CATCH_RETRY_FRAME;
3858       
3859     case CATCH_STM_FRAME:
3860     default:
3861       ASSERT(info->i.type != CATCH_FRAME);
3862       ASSERT(info->i.type != STOP_FRAME);
3863       p = next; 
3864       continue;
3865     }
3866   }
3867 }
3868
3869 /* -----------------------------------------------------------------------------
3870    resurrectThreads is called after garbage collection on the list of
3871    threads found to be garbage.  Each of these threads will be woken
3872    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3873    on an MVar, or NonTermination if the thread was blocked on a Black
3874    Hole.
3875
3876    Locks: assumes we hold *all* the capabilities.
3877    -------------------------------------------------------------------------- */
3878
3879 void
3880 resurrectThreads (StgTSO *threads)
3881 {
3882     StgTSO *tso, *next;
3883     Capability *cap;
3884
3885     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3886         next = tso->global_link;
3887         tso->global_link = all_threads;
3888         all_threads = tso;
3889         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3890         
3891         // Wake up the thread on the Capability it was last on for a
3892         // bound thread, or last_free_capability otherwise.
3893         if (tso->bound) {
3894             cap = tso->bound->cap;
3895         } else {
3896             cap = last_free_capability;
3897         }
3898         
3899         switch (tso->why_blocked) {
3900         case BlockedOnMVar:
3901         case BlockedOnException:
3902             /* Called by GC - sched_mutex lock is currently held. */
3903             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
3904             break;
3905         case BlockedOnBlackHole:
3906             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
3907             break;
3908         case BlockedOnSTM:
3909             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
3910             break;
3911         case NotBlocked:
3912             /* This might happen if the thread was blocked on a black hole
3913              * belonging to a thread that we've just woken up (raiseAsync
3914              * can wake up threads, remember...).
3915              */
3916             continue;
3917         default:
3918             barf("resurrectThreads: thread blocked in a strange way");
3919         }
3920     }
3921 }
3922
3923 /* ----------------------------------------------------------------------------
3924  * Debugging: why is a thread blocked
3925  * [Also provides useful information when debugging threaded programs
3926  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3927    ------------------------------------------------------------------------- */
3928
3929 #if DEBUG
3930 static void
3931 printThreadBlockage(StgTSO *tso)
3932 {
3933   switch (tso->why_blocked) {
3934   case BlockedOnRead:
3935     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
3936     break;
3937   case BlockedOnWrite:
3938     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
3939     break;
3940 #if defined(mingw32_HOST_OS)
3941     case BlockedOnDoProc:
3942     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
3943     break;
3944 #endif
3945   case BlockedOnDelay:
3946     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
3947     break;
3948   case BlockedOnMVar:
3949     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
3950     break;
3951   case BlockedOnException:
3952     debugBelch("is blocked on delivering an exception to thread %d",
3953             tso->block_info.tso->id);
3954     break;
3955   case BlockedOnBlackHole:
3956     debugBelch("is blocked on a black hole");
3957     break;
3958   case NotBlocked:
3959     debugBelch("is not blocked");
3960     break;
3961 #if defined(PARALLEL_HASKELL)
3962   case BlockedOnGA:
3963     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3964             tso->block_info.closure, info_type(tso->block_info.closure));
3965     break;
3966   case BlockedOnGA_NoSend:
3967     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3968             tso->block_info.closure, info_type(tso->block_info.closure));
3969     break;
3970 #endif
3971   case BlockedOnCCall:
3972     debugBelch("is blocked on an external call");
3973     break;
3974   case BlockedOnCCall_NoUnblockExc:
3975     debugBelch("is blocked on an external call (exceptions were already blocked)");
3976     break;
3977   case BlockedOnSTM:
3978     debugBelch("is blocked on an STM operation");
3979     break;
3980   default:
3981     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3982          tso->why_blocked, tso->id, tso);
3983   }
3984 }
3985
3986 static void
3987 printThreadStatus(StgTSO *tso)
3988 {
3989   switch (tso->what_next) {
3990   case ThreadKilled:
3991     debugBelch("has been killed");
3992     break;
3993   case ThreadComplete:
3994     debugBelch("has completed");
3995     break;
3996   default:
3997     printThreadBlockage(tso);
3998   }
3999 }
4000
4001 void
4002 printAllThreads(void)
4003 {
4004   StgTSO *t;
4005
4006 # if defined(GRAN)
4007   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4008   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4009                        time_string, rtsFalse/*no commas!*/);
4010
4011   debugBelch("all threads at [%s]:\n", time_string);
4012 # elif defined(PARALLEL_HASKELL)
4013   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4014   ullong_format_string(CURRENT_TIME,
4015                        time_string, rtsFalse/*no commas!*/);
4016
4017   debugBelch("all threads at [%s]:\n", time_string);
4018 # else
4019   debugBelch("all threads:\n");
4020 # endif
4021
4022   for (t = all_threads; t != END_TSO_QUEUE; ) {
4023     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4024     {
4025       void *label = lookupThreadLabel(t->id);
4026       if (label) debugBelch("[\"%s\"] ",(char *)label);
4027     }
4028     if (t->what_next == ThreadRelocated) {
4029         debugBelch("has been relocated...\n");
4030         t = t->link;
4031     } else {
4032         printThreadStatus(t);
4033         debugBelch("\n");
4034         t = t->global_link;
4035     }
4036   }
4037 }
4038
4039 // useful from gdb
4040 void 
4041 printThreadQueue(StgTSO *t)
4042 {
4043     nat i = 0;
4044     for (; t != END_TSO_QUEUE; t = t->link) {
4045         debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4046         if (t->what_next == ThreadRelocated) {
4047             debugBelch("has been relocated...\n");
4048         } else {
4049             printThreadStatus(t);
4050             debugBelch("\n");
4051         }
4052         i++;
4053     }
4054     debugBelch("%d threads on queue\n", i);
4055 }
4056
4057 /* 
4058    Print a whole blocking queue attached to node (debugging only).
4059 */
4060 # if defined(PARALLEL_HASKELL)
4061 void 
4062 print_bq (StgClosure *node)
4063 {
4064   StgBlockingQueueElement *bqe;
4065   StgTSO *tso;
4066   rtsBool end;
4067
4068   debugBelch("## BQ of closure %p (%s): ",
4069           node, info_type(node));
4070
4071   /* should cover all closures that may have a blocking queue */
4072   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4073          get_itbl(node)->type == FETCH_ME_BQ ||
4074          get_itbl(node)->type == RBH ||
4075          get_itbl(node)->type == MVAR);
4076     
4077   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4078
4079   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4080 }
4081
4082 /* 
4083    Print a whole blocking queue starting with the element bqe.
4084 */
4085 void 
4086 print_bqe (StgBlockingQueueElement *bqe)
4087 {
4088   rtsBool end;
4089
4090   /* 
4091      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4092   */
4093   for (end = (bqe==END_BQ_QUEUE);
4094        !end; // iterate until bqe points to a CONSTR
4095        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4096        bqe = end ? END_BQ_QUEUE : bqe->link) {
4097     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4098     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4099     /* types of closures that may appear in a blocking queue */
4100     ASSERT(get_itbl(bqe)->type == TSO ||           
4101            get_itbl(bqe)->type == BLOCKED_FETCH || 
4102            get_itbl(bqe)->type == CONSTR); 
4103     /* only BQs of an RBH end with an RBH_Save closure */
4104     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4105
4106     switch (get_itbl(bqe)->type) {
4107     case TSO:
4108       debugBelch(" TSO %u (%x),",
4109               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4110       break;
4111     case BLOCKED_FETCH:
4112       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4113               ((StgBlockedFetch *)bqe)->node, 
4114               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4115               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4116               ((StgBlockedFetch *)bqe)->ga.weight);
4117       break;
4118     case CONSTR:
4119       debugBelch(" %s (IP %p),",
4120               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4121                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4122                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4123                "RBH_Save_?"), get_itbl(bqe));
4124       break;
4125     default:
4126       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4127            info_type((StgClosure *)bqe)); // , node, info_type(node));
4128       break;
4129     }
4130   } /* for */
4131   debugBelch("\n");
4132 }
4133 # elif defined(GRAN)
4134 void 
4135 print_bq (StgClosure *node)
4136 {
4137   StgBlockingQueueElement *bqe;
4138   PEs node_loc, tso_loc;
4139   rtsBool end;
4140
4141   /* should cover all closures that may have a blocking queue */
4142   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4143          get_itbl(node)->type == FETCH_ME_BQ ||
4144          get_itbl(node)->type == RBH);
4145     
4146   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4147   node_loc = where_is(node);
4148
4149   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4150           node, info_type(node), node_loc);
4151
4152   /* 
4153      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4154   */
4155   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4156        !end; // iterate until bqe points to a CONSTR
4157        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4158     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4159     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4160     /* types of closures that may appear in a blocking queue */
4161     ASSERT(get_itbl(bqe)->type == TSO ||           
4162            get_itbl(bqe)->type == CONSTR); 
4163     /* only BQs of an RBH end with an RBH_Save closure */
4164     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4165
4166     tso_loc = where_is((StgClosure *)bqe);
4167     switch (get_itbl(bqe)->type) {
4168     case TSO:
4169       debugBelch(" TSO %d (%p) on [PE %d],",
4170               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4171       break;
4172     case CONSTR:
4173       debugBelch(" %s (IP %p),",
4174               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4175                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4176                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4177                "RBH_Save_?"), get_itbl(bqe));
4178       break;
4179     default:
4180       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4181            info_type((StgClosure *)bqe), node, info_type(node));
4182       break;
4183     }
4184   } /* for */
4185   debugBelch("\n");
4186 }
4187 # endif
4188
4189 #if defined(PARALLEL_HASKELL)
4190 static nat
4191 run_queue_len(void)
4192 {
4193     nat i;
4194     StgTSO *tso;
4195     
4196     for (i=0, tso=run_queue_hd; 
4197          tso != END_TSO_QUEUE;
4198          i++, tso=tso->link) {
4199         /* nothing */
4200     }
4201         
4202     return i;
4203 }
4204 #endif
4205
4206 void
4207 sched_belch(char *s, ...)
4208 {
4209     va_list ap;
4210     va_start(ap,s);
4211 #ifdef THREADED_RTS
4212     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4213 #elif defined(PARALLEL_HASKELL)
4214     debugBelch("== ");
4215 #else
4216     debugBelch("sched: ");
4217 #endif
4218     vdebugBelch(s, ap);
4219     debugBelch("\n");
4220     va_end(ap);
4221 }
4222
4223 #endif /* DEBUG */