[project @ 2005-10-21 14:02:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 #ifdef THREADED_RTS
73 #define USED_WHEN_THREADED_RTS
74 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
75 #else
76 #define USED_WHEN_THREADED_RTS     STG_UNUSED
77 #define USED_WHEN_NON_THREADED_RTS
78 #endif
79
80 #ifdef SMP
81 #define USED_WHEN_SMP
82 #else
83 #define USED_WHEN_SMP STG_UNUSED
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Global variables
88  * -------------------------------------------------------------------------- */
89
90 #if defined(GRAN)
91
92 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
93 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
94
95 /* 
96    In GranSim we have a runnable and a blocked queue for each processor.
97    In order to minimise code changes new arrays run_queue_hds/tls
98    are created. run_queue_hd is then a short cut (macro) for
99    run_queue_hds[CurrentProc] (see GranSim.h).
100    -- HWL
101 */
102 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
103 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
104 StgTSO *ccalling_threadss[MAX_PROC];
105 /* We use the same global list of threads (all_threads) in GranSim as in
106    the std RTS (i.e. we are cheating). However, we don't use this list in
107    the GranSim specific code at the moment (so we are only potentially
108    cheating).  */
109
110 #else /* !GRAN */
111
112 #if !defined(THREADED_RTS)
113 // Blocked/sleeping thrads
114 StgTSO *blocked_queue_hd = NULL;
115 StgTSO *blocked_queue_tl = NULL;
116 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
117 #endif
118
119 /* Threads blocked on blackholes.
120  * LOCK: sched_mutex+capability, or all capabilities
121  */
122 StgTSO *blackhole_queue = NULL;
123 #endif
124
125 /* The blackhole_queue should be checked for threads to wake up.  See
126  * Schedule.h for more thorough comment.
127  * LOCK: none (doesn't matter if we miss an update)
128  */
129 rtsBool blackholes_need_checking = rtsFalse;
130
131 /* Linked list of all threads.
132  * Used for detecting garbage collected threads.
133  * LOCK: sched_mutex+capability, or all capabilities
134  */
135 StgTSO *all_threads = NULL;
136
137 /* flag set by signal handler to precipitate a context switch
138  * LOCK: none (just an advisory flag)
139  */
140 int context_switch = 0;
141
142 /* flag that tracks whether we have done any execution in this time slice.
143  * LOCK: currently none, perhaps we should lock (but needs to be
144  * updated in the fast path of the scheduler).
145  */
146 nat recent_activity = ACTIVITY_YES;
147
148 /* if this flag is set as well, give up execution
149  * LOCK: none (changes once, from false->true)
150  */
151 rtsBool interrupted = rtsFalse;
152
153 /* Next thread ID to allocate.
154  * LOCK: sched_mutex
155  */
156 static StgThreadID next_thread_id = 1;
157
158 /* The smallest stack size that makes any sense is:
159  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
160  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
161  *  + 1                       (the closure to enter)
162  *  + 1                       (stg_ap_v_ret)
163  *  + 1                       (spare slot req'd by stg_ap_v_ret)
164  *
165  * A thread with this stack will bomb immediately with a stack
166  * overflow, which will increase its stack size.  
167  */
168 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
169
170 #if defined(GRAN)
171 StgTSO *CurrentTSO;
172 #endif
173
174 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
175  *  exists - earlier gccs apparently didn't.
176  *  -= chak
177  */
178 StgTSO dummy_tso;
179
180 /*
181  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
182  * in an MT setting, needed to signal that a worker thread shouldn't hang around
183  * in the scheduler when it is out of work.
184  */
185 rtsBool shutting_down_scheduler = rtsFalse;
186
187 /*
188  * This mutex protects most of the global scheduler data in
189  * the THREADED_RTS and (inc. SMP) runtime.
190  */
191 #if defined(THREADED_RTS)
192 Mutex sched_mutex = INIT_MUTEX_VAR;
193 #endif
194
195 #if defined(PARALLEL_HASKELL)
196 StgTSO *LastTSO;
197 rtsTime TimeOfLastYield;
198 rtsBool emitSchedule = rtsTrue;
199 #endif
200
201 /* -----------------------------------------------------------------------------
202  * static function prototypes
203  * -------------------------------------------------------------------------- */
204
205 static Capability *schedule (Capability *initialCapability, Task *task);
206
207 //
208 // These function all encapsulate parts of the scheduler loop, and are
209 // abstracted only to make the structure and control flow of the
210 // scheduler clearer.
211 //
212 static void schedulePreLoop (void);
213 static void scheduleStartSignalHandlers (void);
214 static void scheduleCheckBlockedThreads (Capability *cap);
215 static void scheduleCheckBlackHoles (Capability *cap);
216 static void scheduleDetectDeadlock (Capability *cap, Task *task);
217 #if defined(GRAN)
218 static StgTSO *scheduleProcessEvent(rtsEvent *event);
219 #endif
220 #if defined(PARALLEL_HASKELL)
221 static StgTSO *scheduleSendPendingMessages(void);
222 static void scheduleActivateSpark(void);
223 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
224 #endif
225 #if defined(PAR) || defined(GRAN)
226 static void scheduleGranParReport(void);
227 #endif
228 static void schedulePostRunThread(void);
229 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
230 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
231                                          StgTSO *t);
232 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
233                                     nat prev_what_next );
234 static void scheduleHandleThreadBlocked( StgTSO *t );
235 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
236                                              StgTSO *t );
237 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
238 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
239
240 static void unblockThread(Capability *cap, StgTSO *tso);
241 static rtsBool checkBlackHoles(Capability *cap);
242 static void AllRoots(evac_fn evac);
243
244 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
245
246 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
247                         rtsBool stop_at_atomically);
248
249 static void deleteThread (Capability *cap, StgTSO *tso);
250 static void deleteRunQueue (Capability *cap);
251
252 #ifdef DEBUG
253 static void printThreadBlockage(StgTSO *tso);
254 static void printThreadStatus(StgTSO *tso);
255 void printThreadQueue(StgTSO *tso);
256 #endif
257
258 #if defined(PARALLEL_HASKELL)
259 StgTSO * createSparkThread(rtsSpark spark);
260 StgTSO * activateSpark (rtsSpark spark);  
261 #endif
262
263 #ifdef DEBUG
264 static char *whatNext_strs[] = {
265   "(unknown)",
266   "ThreadRunGHC",
267   "ThreadInterpret",
268   "ThreadKilled",
269   "ThreadRelocated",
270   "ThreadComplete"
271 };
272 #endif
273
274 /* -----------------------------------------------------------------------------
275  * Putting a thread on the run queue: different scheduling policies
276  * -------------------------------------------------------------------------- */
277
278 STATIC_INLINE void
279 addToRunQueue( Capability *cap, StgTSO *t )
280 {
281 #if defined(PARALLEL_HASKELL)
282     if (RtsFlags.ParFlags.doFairScheduling) { 
283         // this does round-robin scheduling; good for concurrency
284         appendToRunQueue(cap,t);
285     } else {
286         // this does unfair scheduling; good for parallelism
287         pushOnRunQueue(cap,t);
288     }
289 #else
290     // this does round-robin scheduling; good for concurrency
291     appendToRunQueue(cap,t);
292 #endif
293 }
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323
324 static Capability *
325 schedule (Capability *initialCapability, Task *task)
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PARALLEL_HASKELL)
333   StgTSO *tso;
334   GlobalTaskId pe;
335   rtsBool receivedFinish = rtsFalse;
336 # if defined(DEBUG)
337   nat tp_size, sp_size; // stats only
338 # endif
339 #endif
340   nat prev_what_next;
341   rtsBool ready_to_gc;
342   rtsBool first = rtsTrue;
343   
344   cap = initialCapability;
345
346   // Pre-condition: this task owns initialCapability.
347   // The sched_mutex is *NOT* held
348   // NB. on return, we still hold a capability.
349
350   IF_DEBUG(scheduler,
351            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
352                        task, initialCapability);
353       );
354
355   schedulePreLoop();
356
357   // -----------------------------------------------------------
358   // Scheduler loop starts here:
359
360 #if defined(PARALLEL_HASKELL)
361 #define TERMINATION_CONDITION        (!receivedFinish)
362 #elif defined(GRAN)
363 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
364 #else
365 #define TERMINATION_CONDITION        rtsTrue
366 #endif
367
368   while (TERMINATION_CONDITION) {
369
370       ASSERT(cap->running_task == task);
371       ASSERT(task->cap == cap);
372       ASSERT(myTask() == task);
373
374 #if defined(GRAN)
375       /* Choose the processor with the next event */
376       CurrentProc = event->proc;
377       CurrentTSO = event->tso;
378 #endif
379
380 #if defined(THREADED_RTS)
381       if (first) {
382           // don't yield the first time, we want a chance to run this
383           // thread for a bit, even if there are others banging at the
384           // door.
385           first = rtsFalse;
386       } else {
387           // Yield the capability to higher-priority tasks if necessary.
388           yieldCapability(&cap, task);
389       }
390 #endif
391
392     // Check whether we have re-entered the RTS from Haskell without
393     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
394     // call).
395     if (cap->in_haskell) {
396           errorBelch("schedule: re-entered unsafely.\n"
397                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
398           stg_exit(EXIT_FAILURE);
399     }
400
401     //
402     // Test for interruption.  If interrupted==rtsTrue, then either
403     // we received a keyboard interrupt (^C), or the scheduler is
404     // trying to shut down all the tasks (shutting_down_scheduler) in
405     // the threaded RTS.
406     //
407     if (interrupted) {
408         deleteRunQueue(cap);
409         if (shutting_down_scheduler) {
410             IF_DEBUG(scheduler, sched_belch("shutting down"));
411             if (task->tso) { // we are bound
412                 task->stat = Interrupted;
413                 task->ret  = NULL;
414             }
415             return cap;
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(not_yet) && defined(SMP)
422     //
423     // Top up the run queue from our spark pool.  We try to make the
424     // number of threads in the run queue equal to the number of
425     // free capabilities.
426     //
427     {
428         StgClosure *spark;
429         if (emptyRunQueue()) {
430             spark = findSpark(rtsFalse);
431             if (spark == NULL) {
432                 break; /* no more sparks in the pool */
433             } else {
434                 createSparkThread(spark);         
435                 IF_DEBUG(scheduler,
436                          sched_belch("==^^ turning spark of closure %p into a thread",
437                                      (StgClosure *)spark));
438             }
439         }
440     }
441 #endif // SMP
442
443     scheduleStartSignalHandlers();
444
445     // Only check the black holes here if we've nothing else to do.
446     // During normal execution, the black hole list only gets checked
447     // at GC time, to avoid repeatedly traversing this possibly long
448     // list each time around the scheduler.
449     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
450
451     scheduleCheckBlockedThreads(cap);
452
453     scheduleDetectDeadlock(cap,task);
454
455     // Normally, the only way we can get here with no threads to
456     // run is if a keyboard interrupt received during 
457     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
458     // Additionally, it is not fatal for the
459     // threaded RTS to reach here with no threads to run.
460     //
461     // win32: might be here due to awaitEvent() being abandoned
462     // as a result of a console event having been delivered.
463     if ( emptyRunQueue(cap) ) {
464 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
465         ASSERT(interrupted);
466 #endif
467         continue; // nothing to do
468     }
469
470 #if defined(PARALLEL_HASKELL)
471     scheduleSendPendingMessages();
472     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
473         continue;
474
475 #if defined(SPARKS)
476     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
477 #endif
478
479     /* If we still have no work we need to send a FISH to get a spark
480        from another PE */
481     if (emptyRunQueue(cap)) {
482         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
483         ASSERT(rtsFalse); // should not happen at the moment
484     }
485     // from here: non-empty run queue.
486     //  TODO: merge above case with this, only one call processMessages() !
487     if (PacketsWaiting()) {  /* process incoming messages, if
488                                 any pending...  only in else
489                                 because getRemoteWork waits for
490                                 messages as well */
491         receivedFinish = processMessages();
492     }
493 #endif
494
495 #if defined(GRAN)
496     scheduleProcessEvent(event);
497 #endif
498
499     // 
500     // Get a thread to run
501     //
502     t = popRunQueue(cap);
503
504 #if defined(GRAN) || defined(PAR)
505     scheduleGranParReport(); // some kind of debuging output
506 #else
507     // Sanity check the thread we're about to run.  This can be
508     // expensive if there is lots of thread switching going on...
509     IF_DEBUG(sanity,checkTSO(t));
510 #endif
511
512 #if defined(THREADED_RTS)
513     // Check whether we can run this thread in the current task.
514     // If not, we have to pass our capability to the right task.
515     {
516         Task *bound = t->bound;
517       
518         if (bound) {
519             if (bound == task) {
520                 IF_DEBUG(scheduler,
521                          sched_belch("### Running thread %d in bound thread",
522                                      t->id));
523                 // yes, the Haskell thread is bound to the current native thread
524             } else {
525                 IF_DEBUG(scheduler,
526                          sched_belch("### thread %d bound to another OS thread",
527                                      t->id));
528                 // no, bound to a different Haskell thread: pass to that thread
529                 pushOnRunQueue(cap,t);
530                 continue;
531             }
532         } else {
533             // The thread we want to run is unbound.
534             if (task->tso) { 
535                 IF_DEBUG(scheduler,
536                          sched_belch("### this OS thread cannot run thread %d", t->id));
537                 // no, the current native thread is bound to a different
538                 // Haskell thread, so pass it to any worker thread
539                 pushOnRunQueue(cap,t);
540                 continue; 
541             }
542         }
543     }
544 #endif
545
546     cap->r.rCurrentTSO = t;
547     
548     /* context switches are initiated by the timer signal, unless
549      * the user specified "context switch as often as possible", with
550      * +RTS -C0
551      */
552     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
553         && !emptyThreadQueues(cap)) {
554         context_switch = 1;
555     }
556          
557 run_thread:
558
559     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
560                               (long)t->id, whatNext_strs[t->what_next]));
561
562 #if defined(PROFILING)
563     startHeapProfTimer();
564 #endif
565
566     // ----------------------------------------------------------------------
567     // Run the current thread 
568
569     prev_what_next = t->what_next;
570
571     errno = t->saved_errno;
572     cap->in_haskell = rtsTrue;
573
574     recent_activity = ACTIVITY_YES;
575
576     switch (prev_what_next) {
577
578     case ThreadKilled:
579     case ThreadComplete:
580         /* Thread already finished, return to scheduler. */
581         ret = ThreadFinished;
582         break;
583
584     case ThreadRunGHC:
585         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
586         break;
587
588     case ThreadInterpret:
589         ret = interpretBCO(cap);
590         break;
591
592     default:
593       barf("schedule: invalid what_next field");
594     }
595
596     // in SMP mode, we might return with a different capability than
597     // we started with, if the Haskell thread made a foreign call.  So
598     // let's find out what our current Capability is:
599     cap = task->cap;
600
601     cap->in_haskell = rtsFalse;
602
603     // The TSO might have moved, eg. if it re-entered the RTS and a GC
604     // happened.  So find the new location:
605     t = cap->r.rCurrentTSO;
606
607     // And save the current errno in this thread.
608     t->saved_errno = errno;
609
610     // ----------------------------------------------------------------------
611     
612     // Costs for the scheduler are assigned to CCS_SYSTEM
613 #if defined(PROFILING)
614     stopHeapProfTimer();
615     CCCS = CCS_SYSTEM;
616 #endif
617     
618     // We have run some Haskell code: there might be blackhole-blocked
619     // threads to wake up now.
620     // Lock-free test here should be ok, we're just setting a flag.
621     if ( blackhole_queue != END_TSO_QUEUE ) {
622         blackholes_need_checking = rtsTrue;
623     }
624     
625 #if defined(THREADED_RTS)
626     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
627 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
628     IF_DEBUG(scheduler,debugBelch("sched: "););
629 #endif
630     
631     schedulePostRunThread();
632
633     ready_to_gc = rtsFalse;
634
635     switch (ret) {
636     case HeapOverflow:
637         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
638         break;
639
640     case StackOverflow:
641         scheduleHandleStackOverflow(cap,task,t);
642         break;
643
644     case ThreadYielding:
645         if (scheduleHandleYield(cap, t, prev_what_next)) {
646             // shortcut for switching between compiler/interpreter:
647             goto run_thread; 
648         }
649         break;
650
651     case ThreadBlocked:
652         scheduleHandleThreadBlocked(t);
653         break;
654
655     case ThreadFinished:
656         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
657         break;
658
659     default:
660       barf("schedule: invalid thread return code %d", (int)ret);
661     }
662
663     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
664     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
665   } /* end of while() */
666
667   IF_PAR_DEBUG(verbose,
668                debugBelch("== Leaving schedule() after having received Finish\n"));
669 }
670
671 /* ----------------------------------------------------------------------------
672  * Setting up the scheduler loop
673  * ------------------------------------------------------------------------- */
674
675 static void
676 schedulePreLoop(void)
677 {
678 #if defined(GRAN) 
679     /* set up first event to get things going */
680     /* ToDo: assign costs for system setup and init MainTSO ! */
681     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
682               ContinueThread, 
683               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
684     
685     IF_DEBUG(gran,
686              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
687                         CurrentTSO);
688              G_TSO(CurrentTSO, 5));
689     
690     if (RtsFlags.GranFlags.Light) {
691         /* Save current time; GranSim Light only */
692         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
693     }      
694 #endif
695 }
696
697 /* ----------------------------------------------------------------------------
698  * Start any pending signal handlers
699  * ------------------------------------------------------------------------- */
700
701 static void
702 scheduleStartSignalHandlers(void)
703 {
704 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
705     if (signals_pending()) { // safe outside the lock
706         startSignalHandlers();
707     }
708 #endif
709 }
710
711 /* ----------------------------------------------------------------------------
712  * Check for blocked threads that can be woken up.
713  * ------------------------------------------------------------------------- */
714
715 static void
716 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
717 {
718 #if !defined(THREADED_RTS)
719     //
720     // Check whether any waiting threads need to be woken up.  If the
721     // run queue is empty, and there are no other tasks running, we
722     // can wait indefinitely for something to happen.
723     //
724     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
725     {
726         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
727     }
728 #endif
729 }
730
731
732 /* ----------------------------------------------------------------------------
733  * Check for threads blocked on BLACKHOLEs that can be woken up
734  * ------------------------------------------------------------------------- */
735 static void
736 scheduleCheckBlackHoles (Capability *cap)
737 {
738     if ( blackholes_need_checking ) // check without the lock first
739     {
740         ACQUIRE_LOCK(&sched_mutex);
741         if ( blackholes_need_checking ) {
742             checkBlackHoles(cap);
743             blackholes_need_checking = rtsFalse;
744         }
745         RELEASE_LOCK(&sched_mutex);
746     }
747 }
748
749 /* ----------------------------------------------------------------------------
750  * Detect deadlock conditions and attempt to resolve them.
751  * ------------------------------------------------------------------------- */
752
753 static void
754 scheduleDetectDeadlock (Capability *cap, Task *task)
755 {
756
757 #if defined(PARALLEL_HASKELL)
758     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
759     return;
760 #endif
761
762     /* 
763      * Detect deadlock: when we have no threads to run, there are no
764      * threads blocked, waiting for I/O, or sleeping, and all the
765      * other tasks are waiting for work, we must have a deadlock of
766      * some description.
767      */
768     if ( emptyThreadQueues(cap) )
769     {
770 #if defined(THREADED_RTS)
771         /* 
772          * In the threaded RTS, we only check for deadlock if there
773          * has been no activity in a complete timeslice.  This means
774          * we won't eagerly start a full GC just because we don't have
775          * any threads to run currently.
776          */
777         if (recent_activity != ACTIVITY_INACTIVE) return;
778 #endif
779
780         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
781
782         // Garbage collection can release some new threads due to
783         // either (a) finalizers or (b) threads resurrected because
784         // they are unreachable and will therefore be sent an
785         // exception.  Any threads thus released will be immediately
786         // runnable.
787         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
788         recent_activity = ACTIVITY_DONE_GC;
789         
790         if ( !emptyRunQueue(cap) ) return;
791
792 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
793         /* If we have user-installed signal handlers, then wait
794          * for signals to arrive rather then bombing out with a
795          * deadlock.
796          */
797         if ( anyUserHandlers() ) {
798             IF_DEBUG(scheduler, 
799                      sched_belch("still deadlocked, waiting for signals..."));
800
801             awaitUserSignals();
802
803             if (signals_pending()) {
804                 startSignalHandlers();
805             }
806
807             // either we have threads to run, or we were interrupted:
808             ASSERT(!emptyRunQueue(cap) || interrupted);
809         }
810 #endif
811
812 #if !defined(THREADED_RTS)
813         /* Probably a real deadlock.  Send the current main thread the
814          * Deadlock exception.
815          */
816         if (task->tso) {
817             switch (task->tso->why_blocked) {
818             case BlockedOnSTM:
819             case BlockedOnBlackHole:
820             case BlockedOnException:
821             case BlockedOnMVar:
822                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
823                 return;
824             default:
825                 barf("deadlock: main thread blocked in a strange way");
826             }
827         }
828         return;
829 #endif
830     }
831 }
832
833 /* ----------------------------------------------------------------------------
834  * Process an event (GRAN only)
835  * ------------------------------------------------------------------------- */
836
837 #if defined(GRAN)
838 static StgTSO *
839 scheduleProcessEvent(rtsEvent *event)
840 {
841     StgTSO *t;
842
843     if (RtsFlags.GranFlags.Light)
844       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
845
846     /* adjust time based on time-stamp */
847     if (event->time > CurrentTime[CurrentProc] &&
848         event->evttype != ContinueThread)
849       CurrentTime[CurrentProc] = event->time;
850     
851     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
852     if (!RtsFlags.GranFlags.Light)
853       handleIdlePEs();
854
855     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
856
857     /* main event dispatcher in GranSim */
858     switch (event->evttype) {
859       /* Should just be continuing execution */
860     case ContinueThread:
861       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
862       /* ToDo: check assertion
863       ASSERT(run_queue_hd != (StgTSO*)NULL &&
864              run_queue_hd != END_TSO_QUEUE);
865       */
866       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
867       if (!RtsFlags.GranFlags.DoAsyncFetch &&
868           procStatus[CurrentProc]==Fetching) {
869         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
870               CurrentTSO->id, CurrentTSO, CurrentProc);
871         goto next_thread;
872       } 
873       /* Ignore ContinueThreads for completed threads */
874       if (CurrentTSO->what_next == ThreadComplete) {
875         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
876               CurrentTSO->id, CurrentTSO, CurrentProc);
877         goto next_thread;
878       } 
879       /* Ignore ContinueThreads for threads that are being migrated */
880       if (PROCS(CurrentTSO)==Nowhere) { 
881         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
882               CurrentTSO->id, CurrentTSO, CurrentProc);
883         goto next_thread;
884       }
885       /* The thread should be at the beginning of the run queue */
886       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
887         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
888               CurrentTSO->id, CurrentTSO, CurrentProc);
889         break; // run the thread anyway
890       }
891       /*
892       new_event(proc, proc, CurrentTime[proc],
893                 FindWork,
894                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
895       goto next_thread; 
896       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
897       break; // now actually run the thread; DaH Qu'vam yImuHbej 
898
899     case FetchNode:
900       do_the_fetchnode(event);
901       goto next_thread;             /* handle next event in event queue  */
902       
903     case GlobalBlock:
904       do_the_globalblock(event);
905       goto next_thread;             /* handle next event in event queue  */
906       
907     case FetchReply:
908       do_the_fetchreply(event);
909       goto next_thread;             /* handle next event in event queue  */
910       
911     case UnblockThread:   /* Move from the blocked queue to the tail of */
912       do_the_unblock(event);
913       goto next_thread;             /* handle next event in event queue  */
914       
915     case ResumeThread:  /* Move from the blocked queue to the tail of */
916       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
917       event->tso->gran.blocktime += 
918         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
919       do_the_startthread(event);
920       goto next_thread;             /* handle next event in event queue  */
921       
922     case StartThread:
923       do_the_startthread(event);
924       goto next_thread;             /* handle next event in event queue  */
925       
926     case MoveThread:
927       do_the_movethread(event);
928       goto next_thread;             /* handle next event in event queue  */
929       
930     case MoveSpark:
931       do_the_movespark(event);
932       goto next_thread;             /* handle next event in event queue  */
933       
934     case FindWork:
935       do_the_findwork(event);
936       goto next_thread;             /* handle next event in event queue  */
937       
938     default:
939       barf("Illegal event type %u\n", event->evttype);
940     }  /* switch */
941     
942     /* This point was scheduler_loop in the old RTS */
943
944     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
945
946     TimeOfLastEvent = CurrentTime[CurrentProc];
947     TimeOfNextEvent = get_time_of_next_event();
948     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
949     // CurrentTSO = ThreadQueueHd;
950
951     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
952                          TimeOfNextEvent));
953
954     if (RtsFlags.GranFlags.Light) 
955       GranSimLight_leave_system(event, &ActiveTSO); 
956
957     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
958
959     IF_DEBUG(gran, 
960              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
961
962     /* in a GranSim setup the TSO stays on the run queue */
963     t = CurrentTSO;
964     /* Take a thread from the run queue. */
965     POP_RUN_QUEUE(t); // take_off_run_queue(t);
966
967     IF_DEBUG(gran, 
968              debugBelch("GRAN: About to run current thread, which is\n");
969              G_TSO(t,5));
970
971     context_switch = 0; // turned on via GranYield, checking events and time slice
972
973     IF_DEBUG(gran, 
974              DumpGranEvent(GR_SCHEDULE, t));
975
976     procStatus[CurrentProc] = Busy;
977 }
978 #endif // GRAN
979
980 /* ----------------------------------------------------------------------------
981  * Send pending messages (PARALLEL_HASKELL only)
982  * ------------------------------------------------------------------------- */
983
984 #if defined(PARALLEL_HASKELL)
985 static StgTSO *
986 scheduleSendPendingMessages(void)
987 {
988     StgSparkPool *pool;
989     rtsSpark spark;
990     StgTSO *t;
991
992 # if defined(PAR) // global Mem.Mgmt., omit for now
993     if (PendingFetches != END_BF_QUEUE) {
994         processFetches();
995     }
996 # endif
997     
998     if (RtsFlags.ParFlags.BufferTime) {
999         // if we use message buffering, we must send away all message
1000         // packets which have become too old...
1001         sendOldBuffers(); 
1002     }
1003 }
1004 #endif
1005
1006 /* ----------------------------------------------------------------------------
1007  * Activate spark threads (PARALLEL_HASKELL only)
1008  * ------------------------------------------------------------------------- */
1009
1010 #if defined(PARALLEL_HASKELL)
1011 static void
1012 scheduleActivateSpark(void)
1013 {
1014 #if defined(SPARKS)
1015   ASSERT(emptyRunQueue());
1016 /* We get here if the run queue is empty and want some work.
1017    We try to turn a spark into a thread, and add it to the run queue,
1018    from where it will be picked up in the next iteration of the scheduler
1019    loop.
1020 */
1021
1022       /* :-[  no local threads => look out for local sparks */
1023       /* the spark pool for the current PE */
1024       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1025       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1026           pool->hd < pool->tl) {
1027         /* 
1028          * ToDo: add GC code check that we really have enough heap afterwards!!
1029          * Old comment:
1030          * If we're here (no runnable threads) and we have pending
1031          * sparks, we must have a space problem.  Get enough space
1032          * to turn one of those pending sparks into a
1033          * thread... 
1034          */
1035
1036         spark = findSpark(rtsFalse);            /* get a spark */
1037         if (spark != (rtsSpark) NULL) {
1038           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1039           IF_PAR_DEBUG(fish, // schedule,
1040                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1041                              tso->id, tso, advisory_thread_count));
1042
1043           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1044             IF_PAR_DEBUG(fish, // schedule,
1045                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1046                             spark));
1047             return rtsFalse; /* failed to generate a thread */
1048           }                  /* otherwise fall through & pick-up new tso */
1049         } else {
1050           IF_PAR_DEBUG(fish, // schedule,
1051                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1052                              spark_queue_len(pool)));
1053           return rtsFalse;  /* failed to generate a thread */
1054         }
1055         return rtsTrue;  /* success in generating a thread */
1056   } else { /* no more threads permitted or pool empty */
1057     return rtsFalse;  /* failed to generateThread */
1058   }
1059 #else
1060   tso = NULL; // avoid compiler warning only
1061   return rtsFalse;  /* dummy in non-PAR setup */
1062 #endif // SPARKS
1063 }
1064 #endif // PARALLEL_HASKELL
1065
1066 /* ----------------------------------------------------------------------------
1067  * Get work from a remote node (PARALLEL_HASKELL only)
1068  * ------------------------------------------------------------------------- */
1069     
1070 #if defined(PARALLEL_HASKELL)
1071 static rtsBool
1072 scheduleGetRemoteWork(rtsBool *receivedFinish)
1073 {
1074   ASSERT(emptyRunQueue());
1075
1076   if (RtsFlags.ParFlags.BufferTime) {
1077         IF_PAR_DEBUG(verbose, 
1078                 debugBelch("...send all pending data,"));
1079         {
1080           nat i;
1081           for (i=1; i<=nPEs; i++)
1082             sendImmediately(i); // send all messages away immediately
1083         }
1084   }
1085 # ifndef SPARKS
1086         //++EDEN++ idle() , i.e. send all buffers, wait for work
1087         // suppress fishing in EDEN... just look for incoming messages
1088         // (blocking receive)
1089   IF_PAR_DEBUG(verbose, 
1090                debugBelch("...wait for incoming messages...\n"));
1091   *receivedFinish = processMessages(); // blocking receive...
1092
1093         // and reenter scheduling loop after having received something
1094         // (return rtsFalse below)
1095
1096 # else /* activate SPARKS machinery */
1097 /* We get here, if we have no work, tried to activate a local spark, but still
1098    have no work. We try to get a remote spark, by sending a FISH message.
1099    Thread migration should be added here, and triggered when a sequence of 
1100    fishes returns without work. */
1101         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1102
1103       /* =8-[  no local sparks => look for work on other PEs */
1104         /*
1105          * We really have absolutely no work.  Send out a fish
1106          * (there may be some out there already), and wait for
1107          * something to arrive.  We clearly can't run any threads
1108          * until a SCHEDULE or RESUME arrives, and so that's what
1109          * we're hoping to see.  (Of course, we still have to
1110          * respond to other types of messages.)
1111          */
1112         rtsTime now = msTime() /*CURRENT_TIME*/;
1113         IF_PAR_DEBUG(verbose, 
1114                      debugBelch("--  now=%ld\n", now));
1115         IF_PAR_DEBUG(fish, // verbose,
1116              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1117                  (last_fish_arrived_at!=0 &&
1118                   last_fish_arrived_at+delay > now)) {
1119                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1120                      now, last_fish_arrived_at+delay, 
1121                      last_fish_arrived_at,
1122                      delay);
1123              });
1124   
1125         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1126             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1127           if (last_fish_arrived_at==0 ||
1128               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1129             /* outstandingFishes is set in sendFish, processFish;
1130                avoid flooding system with fishes via delay */
1131     next_fish_to_send_at = 0;  
1132   } else {
1133     /* ToDo: this should be done in the main scheduling loop to avoid the
1134              busy wait here; not so bad if fish delay is very small  */
1135     int iq = 0; // DEBUGGING -- HWL
1136     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1137     /* send a fish when ready, but process messages that arrive in the meantime */
1138     do {
1139       if (PacketsWaiting()) {
1140         iq++; // DEBUGGING
1141         *receivedFinish = processMessages();
1142       }
1143       now = msTime();
1144     } while (!*receivedFinish || now<next_fish_to_send_at);
1145     // JB: This means the fish could become obsolete, if we receive
1146     // work. Better check for work again? 
1147     // last line: while (!receivedFinish || !haveWork || now<...)
1148     // next line: if (receivedFinish || haveWork )
1149
1150     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1151       return rtsFalse;  // NB: this will leave scheduler loop
1152                         // immediately after return!
1153                           
1154     IF_PAR_DEBUG(fish, // verbose,
1155                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1156
1157   }
1158
1159     // JB: IMHO, this should all be hidden inside sendFish(...)
1160     /* pe = choosePE(); 
1161        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1162                 NEW_FISH_HUNGER);
1163
1164     // Global statistics: count no. of fishes
1165     if (RtsFlags.ParFlags.ParStats.Global &&
1166          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1167            globalParStats.tot_fish_mess++;
1168            }
1169     */ 
1170
1171   /* delayed fishes must have been sent by now! */
1172   next_fish_to_send_at = 0;  
1173   }
1174       
1175   *receivedFinish = processMessages();
1176 # endif /* SPARKS */
1177
1178  return rtsFalse;
1179  /* NB: this function always returns rtsFalse, meaning the scheduler
1180     loop continues with the next iteration; 
1181     rationale: 
1182       return code means success in finding work; we enter this function
1183       if there is no local work, thus have to send a fish which takes
1184       time until it arrives with work; in the meantime we should process
1185       messages in the main loop;
1186  */
1187 }
1188 #endif // PARALLEL_HASKELL
1189
1190 /* ----------------------------------------------------------------------------
1191  * PAR/GRAN: Report stats & debugging info(?)
1192  * ------------------------------------------------------------------------- */
1193
1194 #if defined(PAR) || defined(GRAN)
1195 static void
1196 scheduleGranParReport(void)
1197 {
1198   ASSERT(run_queue_hd != END_TSO_QUEUE);
1199
1200   /* Take a thread from the run queue, if we have work */
1201   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1202
1203     /* If this TSO has got its outport closed in the meantime, 
1204      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1205      * It has to be marked as TH_DEAD for this purpose.
1206      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1207
1208 JB: TODO: investigate wether state change field could be nuked
1209      entirely and replaced by the normal tso state (whatnext
1210      field). All we want to do is to kill tsos from outside.
1211      */
1212
1213     /* ToDo: write something to the log-file
1214     if (RTSflags.ParFlags.granSimStats && !sameThread)
1215         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1216
1217     CurrentTSO = t;
1218     */
1219     /* the spark pool for the current PE */
1220     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1221
1222     IF_DEBUG(scheduler, 
1223              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1224                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1225
1226     IF_PAR_DEBUG(fish,
1227              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1228                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1229
1230     if (RtsFlags.ParFlags.ParStats.Full && 
1231         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1232         (emitSchedule || // forced emit
1233          (t && LastTSO && t->id != LastTSO->id))) {
1234       /* 
1235          we are running a different TSO, so write a schedule event to log file
1236          NB: If we use fair scheduling we also have to write  a deschedule 
1237              event for LastTSO; with unfair scheduling we know that the
1238              previous tso has blocked whenever we switch to another tso, so
1239              we don't need it in GUM for now
1240       */
1241       IF_PAR_DEBUG(fish, // schedule,
1242                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1243
1244       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1245                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1246       emitSchedule = rtsFalse;
1247     }
1248 }     
1249 #endif
1250
1251 /* ----------------------------------------------------------------------------
1252  * After running a thread...
1253  * ------------------------------------------------------------------------- */
1254
1255 static void
1256 schedulePostRunThread(void)
1257 {
1258 #if defined(PAR)
1259     /* HACK 675: if the last thread didn't yield, make sure to print a 
1260        SCHEDULE event to the log file when StgRunning the next thread, even
1261        if it is the same one as before */
1262     LastTSO = t; 
1263     TimeOfLastYield = CURRENT_TIME;
1264 #endif
1265
1266   /* some statistics gathering in the parallel case */
1267
1268 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1269   switch (ret) {
1270     case HeapOverflow:
1271 # if defined(GRAN)
1272       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1273       globalGranStats.tot_heapover++;
1274 # elif defined(PAR)
1275       globalParStats.tot_heapover++;
1276 # endif
1277       break;
1278
1279      case StackOverflow:
1280 # if defined(GRAN)
1281       IF_DEBUG(gran, 
1282                DumpGranEvent(GR_DESCHEDULE, t));
1283       globalGranStats.tot_stackover++;
1284 # elif defined(PAR)
1285       // IF_DEBUG(par, 
1286       // DumpGranEvent(GR_DESCHEDULE, t);
1287       globalParStats.tot_stackover++;
1288 # endif
1289       break;
1290
1291     case ThreadYielding:
1292 # if defined(GRAN)
1293       IF_DEBUG(gran, 
1294                DumpGranEvent(GR_DESCHEDULE, t));
1295       globalGranStats.tot_yields++;
1296 # elif defined(PAR)
1297       // IF_DEBUG(par, 
1298       // DumpGranEvent(GR_DESCHEDULE, t);
1299       globalParStats.tot_yields++;
1300 # endif
1301       break; 
1302
1303     case ThreadBlocked:
1304 # if defined(GRAN)
1305       IF_DEBUG(scheduler,
1306                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1307                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1308                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1309                if (t->block_info.closure!=(StgClosure*)NULL)
1310                  print_bq(t->block_info.closure);
1311                debugBelch("\n"));
1312
1313       // ??? needed; should emit block before
1314       IF_DEBUG(gran, 
1315                DumpGranEvent(GR_DESCHEDULE, t)); 
1316       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1317       /*
1318         ngoq Dogh!
1319       ASSERT(procStatus[CurrentProc]==Busy || 
1320               ((procStatus[CurrentProc]==Fetching) && 
1321               (t->block_info.closure!=(StgClosure*)NULL)));
1322       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1323           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1324             procStatus[CurrentProc]==Fetching)) 
1325         procStatus[CurrentProc] = Idle;
1326       */
1327 # elif defined(PAR)
1328 //++PAR++  blockThread() writes the event (change?)
1329 # endif
1330     break;
1331
1332   case ThreadFinished:
1333     break;
1334
1335   default:
1336     barf("parGlobalStats: unknown return code");
1337     break;
1338     }
1339 #endif
1340 }
1341
1342 /* -----------------------------------------------------------------------------
1343  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1344  * -------------------------------------------------------------------------- */
1345
1346 static rtsBool
1347 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1348 {
1349     // did the task ask for a large block?
1350     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1351         // if so, get one and push it on the front of the nursery.
1352         bdescr *bd;
1353         lnat blocks;
1354         
1355         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1356         
1357         IF_DEBUG(scheduler,
1358                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1359                             (long)t->id, whatNext_strs[t->what_next], blocks));
1360         
1361         // don't do this if the nursery is (nearly) full, we'll GC first.
1362         if (cap->r.rCurrentNursery->link != NULL ||
1363             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1364                                                // if the nursery has only one block.
1365             
1366             ACQUIRE_SM_LOCK
1367             bd = allocGroup( blocks );
1368             RELEASE_SM_LOCK
1369             cap->r.rNursery->n_blocks += blocks;
1370             
1371             // link the new group into the list
1372             bd->link = cap->r.rCurrentNursery;
1373             bd->u.back = cap->r.rCurrentNursery->u.back;
1374             if (cap->r.rCurrentNursery->u.back != NULL) {
1375                 cap->r.rCurrentNursery->u.back->link = bd;
1376             } else {
1377 #if !defined(SMP)
1378                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1379                        g0s0 == cap->r.rNursery);
1380 #endif
1381                 cap->r.rNursery->blocks = bd;
1382             }             
1383             cap->r.rCurrentNursery->u.back = bd;
1384             
1385             // initialise it as a nursery block.  We initialise the
1386             // step, gen_no, and flags field of *every* sub-block in
1387             // this large block, because this is easier than making
1388             // sure that we always find the block head of a large
1389             // block whenever we call Bdescr() (eg. evacuate() and
1390             // isAlive() in the GC would both have to do this, at
1391             // least).
1392             { 
1393                 bdescr *x;
1394                 for (x = bd; x < bd + blocks; x++) {
1395                     x->step = cap->r.rNursery;
1396                     x->gen_no = 0;
1397                     x->flags = 0;
1398                 }
1399             }
1400             
1401             // This assert can be a killer if the app is doing lots
1402             // of large block allocations.
1403             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1404             
1405             // now update the nursery to point to the new block
1406             cap->r.rCurrentNursery = bd;
1407             
1408             // we might be unlucky and have another thread get on the
1409             // run queue before us and steal the large block, but in that
1410             // case the thread will just end up requesting another large
1411             // block.
1412             pushOnRunQueue(cap,t);
1413             return rtsFalse;  /* not actually GC'ing */
1414         }
1415     }
1416     
1417     IF_DEBUG(scheduler,
1418              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1419                         (long)t->id, whatNext_strs[t->what_next]));
1420 #if defined(GRAN)
1421     ASSERT(!is_on_queue(t,CurrentProc));
1422 #elif defined(PARALLEL_HASKELL)
1423     /* Currently we emit a DESCHEDULE event before GC in GUM.
1424        ToDo: either add separate event to distinguish SYSTEM time from rest
1425        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1426     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1427         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1428                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1429         emitSchedule = rtsTrue;
1430     }
1431 #endif
1432       
1433     pushOnRunQueue(cap,t);
1434     return rtsTrue;
1435     /* actual GC is done at the end of the while loop in schedule() */
1436 }
1437
1438 /* -----------------------------------------------------------------------------
1439  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1440  * -------------------------------------------------------------------------- */
1441
1442 static void
1443 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1444 {
1445     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1446                                   (long)t->id, whatNext_strs[t->what_next]));
1447     /* just adjust the stack for this thread, then pop it back
1448      * on the run queue.
1449      */
1450     { 
1451         /* enlarge the stack */
1452         StgTSO *new_t = threadStackOverflow(cap, t);
1453         
1454         /* This TSO has moved, so update any pointers to it from the
1455          * main thread stack.  It better not be on any other queues...
1456          * (it shouldn't be).
1457          */
1458         if (task->tso != NULL) {
1459             task->tso = new_t;
1460         }
1461         pushOnRunQueue(cap,new_t);
1462     }
1463 }
1464
1465 /* -----------------------------------------------------------------------------
1466  * Handle a thread that returned to the scheduler with ThreadYielding
1467  * -------------------------------------------------------------------------- */
1468
1469 static rtsBool
1470 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1471 {
1472     // Reset the context switch flag.  We don't do this just before
1473     // running the thread, because that would mean we would lose ticks
1474     // during GC, which can lead to unfair scheduling (a thread hogs
1475     // the CPU because the tick always arrives during GC).  This way
1476     // penalises threads that do a lot of allocation, but that seems
1477     // better than the alternative.
1478     context_switch = 0;
1479     
1480     /* put the thread back on the run queue.  Then, if we're ready to
1481      * GC, check whether this is the last task to stop.  If so, wake
1482      * up the GC thread.  getThread will block during a GC until the
1483      * GC is finished.
1484      */
1485     IF_DEBUG(scheduler,
1486              if (t->what_next != prev_what_next) {
1487                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1488                             (long)t->id, whatNext_strs[t->what_next]);
1489              } else {
1490                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1491                             (long)t->id, whatNext_strs[t->what_next]);
1492              }
1493         );
1494     
1495     IF_DEBUG(sanity,
1496              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1497              checkTSO(t));
1498     ASSERT(t->link == END_TSO_QUEUE);
1499     
1500     // Shortcut if we're just switching evaluators: don't bother
1501     // doing stack squeezing (which can be expensive), just run the
1502     // thread.
1503     if (t->what_next != prev_what_next) {
1504         return rtsTrue;
1505     }
1506     
1507 #if defined(GRAN)
1508     ASSERT(!is_on_queue(t,CurrentProc));
1509       
1510     IF_DEBUG(sanity,
1511              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1512              checkThreadQsSanity(rtsTrue));
1513
1514 #endif
1515
1516     addToRunQueue(cap,t);
1517
1518 #if defined(GRAN)
1519     /* add a ContinueThread event to actually process the thread */
1520     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1521               ContinueThread,
1522               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1523     IF_GRAN_DEBUG(bq, 
1524                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1525                   G_EVENTQ(0);
1526                   G_CURR_THREADQ(0));
1527 #endif
1528     return rtsFalse;
1529 }
1530
1531 /* -----------------------------------------------------------------------------
1532  * Handle a thread that returned to the scheduler with ThreadBlocked
1533  * -------------------------------------------------------------------------- */
1534
1535 static void
1536 scheduleHandleThreadBlocked( StgTSO *t
1537 #if !defined(GRAN) && !defined(DEBUG)
1538     STG_UNUSED
1539 #endif
1540     )
1541 {
1542 #if defined(GRAN)
1543     IF_DEBUG(scheduler,
1544              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1545                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1546              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1547     
1548     // ??? needed; should emit block before
1549     IF_DEBUG(gran, 
1550              DumpGranEvent(GR_DESCHEDULE, t)); 
1551     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1552     /*
1553       ngoq Dogh!
1554       ASSERT(procStatus[CurrentProc]==Busy || 
1555       ((procStatus[CurrentProc]==Fetching) && 
1556       (t->block_info.closure!=(StgClosure*)NULL)));
1557       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1558       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1559       procStatus[CurrentProc]==Fetching)) 
1560       procStatus[CurrentProc] = Idle;
1561     */
1562 #elif defined(PAR)
1563     IF_DEBUG(scheduler,
1564              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1565                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1566     IF_PAR_DEBUG(bq,
1567                  
1568                  if (t->block_info.closure!=(StgClosure*)NULL) 
1569                  print_bq(t->block_info.closure));
1570     
1571     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1572     blockThread(t);
1573     
1574     /* whatever we schedule next, we must log that schedule */
1575     emitSchedule = rtsTrue;
1576     
1577 #else /* !GRAN */
1578
1579       // We don't need to do anything.  The thread is blocked, and it
1580       // has tidied up its stack and placed itself on whatever queue
1581       // it needs to be on.
1582
1583 #if !defined(SMP)
1584     ASSERT(t->why_blocked != NotBlocked);
1585              // This might not be true under SMP: we don't have
1586              // exclusive access to this TSO, so someone might have
1587              // woken it up by now.  This actually happens: try
1588              // conc023 +RTS -N2.
1589 #endif
1590
1591     IF_DEBUG(scheduler,
1592              debugBelch("--<< thread %d (%s) stopped: ", 
1593                         t->id, whatNext_strs[t->what_next]);
1594              printThreadBlockage(t);
1595              debugBelch("\n"));
1596     
1597     /* Only for dumping event to log file 
1598        ToDo: do I need this in GranSim, too?
1599        blockThread(t);
1600     */
1601 #endif
1602 }
1603
1604 /* -----------------------------------------------------------------------------
1605  * Handle a thread that returned to the scheduler with ThreadFinished
1606  * -------------------------------------------------------------------------- */
1607
1608 static rtsBool
1609 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1610 {
1611     /* Need to check whether this was a main thread, and if so,
1612      * return with the return value.
1613      *
1614      * We also end up here if the thread kills itself with an
1615      * uncaught exception, see Exception.cmm.
1616      */
1617     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1618                                   t->id, whatNext_strs[t->what_next]));
1619
1620 #if defined(GRAN)
1621       endThread(t, CurrentProc); // clean-up the thread
1622 #elif defined(PARALLEL_HASKELL)
1623       /* For now all are advisory -- HWL */
1624       //if(t->priority==AdvisoryPriority) ??
1625       advisory_thread_count--; // JB: Caution with this counter, buggy!
1626       
1627 # if defined(DIST)
1628       if(t->dist.priority==RevalPriority)
1629         FinishReval(t);
1630 # endif
1631     
1632 # if defined(EDENOLD)
1633       // the thread could still have an outport... (BUG)
1634       if (t->eden.outport != -1) {
1635       // delete the outport for the tso which has finished...
1636         IF_PAR_DEBUG(eden_ports,
1637                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1638                               t->eden.outport, t->id));
1639         deleteOPT(t);
1640       }
1641       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1642       if (t->eden.epid != -1) {
1643         IF_PAR_DEBUG(eden_ports,
1644                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1645                            t->id, t->eden.epid));
1646         removeTSOfromProcess(t);
1647       }
1648 # endif 
1649
1650 # if defined(PAR)
1651       if (RtsFlags.ParFlags.ParStats.Full &&
1652           !RtsFlags.ParFlags.ParStats.Suppressed) 
1653         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1654
1655       //  t->par only contains statistics: left out for now...
1656       IF_PAR_DEBUG(fish,
1657                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1658                               t->id,t,t->par.sparkname));
1659 # endif
1660 #endif // PARALLEL_HASKELL
1661
1662       //
1663       // Check whether the thread that just completed was a bound
1664       // thread, and if so return with the result.  
1665       //
1666       // There is an assumption here that all thread completion goes
1667       // through this point; we need to make sure that if a thread
1668       // ends up in the ThreadKilled state, that it stays on the run
1669       // queue so it can be dealt with here.
1670       //
1671
1672       if (t->bound) {
1673
1674           if (t->bound != task) {
1675 #if !defined(THREADED_RTS)
1676               // Must be a bound thread that is not the topmost one.  Leave
1677               // it on the run queue until the stack has unwound to the
1678               // point where we can deal with this.  Leaving it on the run
1679               // queue also ensures that the garbage collector knows about
1680               // this thread and its return value (it gets dropped from the
1681               // all_threads list so there's no other way to find it).
1682               appendToRunQueue(cap,t);
1683               return rtsFalse;
1684 #else
1685               // this cannot happen in the threaded RTS, because a
1686               // bound thread can only be run by the appropriate Task.
1687               barf("finished bound thread that isn't mine");
1688 #endif
1689           }
1690
1691           ASSERT(task->tso == t);
1692
1693           if (t->what_next == ThreadComplete) {
1694               if (task->ret) {
1695                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1696                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1697               }
1698               task->stat = Success;
1699           } else {
1700               if (task->ret) {
1701                   *(task->ret) = NULL;
1702               }
1703               if (interrupted) {
1704                   task->stat = Interrupted;
1705               } else {
1706                   task->stat = Killed;
1707               }
1708           }
1709 #ifdef DEBUG
1710           removeThreadLabel((StgWord)task->tso->id);
1711 #endif
1712           return rtsTrue; // tells schedule() to return
1713       }
1714
1715       return rtsFalse;
1716 }
1717
1718 /* -----------------------------------------------------------------------------
1719  * Perform a heap census, if PROFILING
1720  * -------------------------------------------------------------------------- */
1721
1722 static rtsBool
1723 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1724 {
1725 #if defined(PROFILING)
1726     // When we have +RTS -i0 and we're heap profiling, do a census at
1727     // every GC.  This lets us get repeatable runs for debugging.
1728     if (performHeapProfile ||
1729         (RtsFlags.ProfFlags.profileInterval==0 &&
1730          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1731         GarbageCollect(GetRoots, rtsTrue);
1732         heapCensus();
1733         performHeapProfile = rtsFalse;
1734         return rtsTrue;  // true <=> we already GC'd
1735     }
1736 #endif
1737     return rtsFalse;
1738 }
1739
1740 /* -----------------------------------------------------------------------------
1741  * Perform a garbage collection if necessary
1742  * -------------------------------------------------------------------------- */
1743
1744 static void
1745 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1746 {
1747     StgTSO *t;
1748 #ifdef SMP
1749     static volatile StgWord waiting_for_gc;
1750     rtsBool was_waiting;
1751     nat i;
1752 #endif
1753
1754 #ifdef SMP
1755     // In order to GC, there must be no threads running Haskell code.
1756     // Therefore, the GC thread needs to hold *all* the capabilities,
1757     // and release them after the GC has completed.  
1758     //
1759     // This seems to be the simplest way: previous attempts involved
1760     // making all the threads with capabilities give up their
1761     // capabilities and sleep except for the *last* one, which
1762     // actually did the GC.  But it's quite hard to arrange for all
1763     // the other tasks to sleep and stay asleep.
1764     //
1765         
1766     was_waiting = cas(&waiting_for_gc, 0, 1);
1767     if (was_waiting) return;
1768
1769     for (i=0; i < n_capabilities; i++) {
1770         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1771         if (cap != &capabilities[i]) {
1772             Capability *pcap = &capabilities[i];
1773             // we better hope this task doesn't get migrated to
1774             // another Capability while we're waiting for this one.
1775             // It won't, because load balancing happens while we have
1776             // all the Capabilities, but even so it's a slightly
1777             // unsavoury invariant.
1778             task->cap = pcap;
1779             waitForReturnCapability(&pcap, task);
1780             if (pcap != &capabilities[i]) {
1781                 barf("scheduleDoGC: got the wrong capability");
1782             }
1783         }
1784     }
1785
1786     waiting_for_gc = rtsFalse;
1787 #endif
1788
1789     /* Kick any transactions which are invalid back to their
1790      * atomically frames.  When next scheduled they will try to
1791      * commit, this commit will fail and they will retry.
1792      */
1793     { 
1794         StgTSO *next;
1795
1796         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1797             if (t->what_next == ThreadRelocated) {
1798                 next = t->link;
1799             } else {
1800                 next = t->global_link;
1801                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1802                     if (!stmValidateNestOfTransactions (t -> trec)) {
1803                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1804                         
1805                         // strip the stack back to the
1806                         // ATOMICALLY_FRAME, aborting the (nested)
1807                         // transaction, and saving the stack of any
1808                         // partially-evaluated thunks on the heap.
1809                         raiseAsync_(cap, t, NULL, rtsTrue);
1810                         
1811 #ifdef REG_R1
1812                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1813 #endif
1814                     }
1815                 }
1816             }
1817         }
1818     }
1819     
1820     // so this happens periodically:
1821     scheduleCheckBlackHoles(cap);
1822     
1823     IF_DEBUG(scheduler, printAllThreads());
1824
1825     /* everybody back, start the GC.
1826      * Could do it in this thread, or signal a condition var
1827      * to do it in another thread.  Either way, we need to
1828      * broadcast on gc_pending_cond afterward.
1829      */
1830 #if defined(THREADED_RTS)
1831     IF_DEBUG(scheduler,sched_belch("doing GC"));
1832 #endif
1833     GarbageCollect(GetRoots, force_major);
1834     
1835 #if defined(SMP)
1836     // release our stash of capabilities.
1837     for (i = 0; i < n_capabilities; i++) {
1838         if (cap != &capabilities[i]) {
1839             task->cap = &capabilities[i];
1840             releaseCapability(&capabilities[i]);
1841         }
1842     }
1843     task->cap = cap;
1844 #endif
1845
1846 #if defined(GRAN)
1847     /* add a ContinueThread event to continue execution of current thread */
1848     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1849               ContinueThread,
1850               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1851     IF_GRAN_DEBUG(bq, 
1852                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1853                   G_EVENTQ(0);
1854                   G_CURR_THREADQ(0));
1855 #endif /* GRAN */
1856 }
1857
1858 /* ---------------------------------------------------------------------------
1859  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1860  * used by Control.Concurrent for error checking.
1861  * ------------------------------------------------------------------------- */
1862  
1863 StgBool
1864 rtsSupportsBoundThreads(void)
1865 {
1866 #if defined(THREADED_RTS)
1867   return rtsTrue;
1868 #else
1869   return rtsFalse;
1870 #endif
1871 }
1872
1873 /* ---------------------------------------------------------------------------
1874  * isThreadBound(tso): check whether tso is bound to an OS thread.
1875  * ------------------------------------------------------------------------- */
1876  
1877 StgBool
1878 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1879 {
1880 #if defined(THREADED_RTS)
1881   return (tso->bound != NULL);
1882 #endif
1883   return rtsFalse;
1884 }
1885
1886 /* ---------------------------------------------------------------------------
1887  * Singleton fork(). Do not copy any running threads.
1888  * ------------------------------------------------------------------------- */
1889
1890 #if !defined(mingw32_HOST_OS) && !defined(SMP)
1891 #define FORKPROCESS_PRIMOP_SUPPORTED
1892 #endif
1893
1894 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1895 static void 
1896 deleteThreadImmediately(Capability *cap, StgTSO *tso);
1897 #endif
1898 StgInt
1899 forkProcess(HsStablePtr *entry
1900 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1901             STG_UNUSED
1902 #endif
1903            )
1904 {
1905 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1906     pid_t pid;
1907     StgTSO* t,*next;
1908     Task *task;
1909     Capability *cap;
1910     
1911     IF_DEBUG(scheduler,sched_belch("forking!"));
1912     
1913     // ToDo: for SMP, we should probably acquire *all* the capabilities
1914     cap = rts_lock();
1915     
1916     pid = fork();
1917     
1918     if (pid) { // parent
1919         
1920         // just return the pid
1921         return pid;
1922         
1923     } else { // child
1924         
1925         // delete all threads
1926         cap->run_queue_hd = END_TSO_QUEUE;
1927         cap->run_queue_tl = END_TSO_QUEUE;
1928         
1929         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1930             next = t->link;
1931             
1932             // don't allow threads to catch the ThreadKilled exception
1933             deleteThreadImmediately(cap,t);
1934         }
1935         
1936         // wipe the main thread list
1937         while ((task = all_tasks) != NULL) {
1938             all_tasks = task->all_link;
1939             discardTask(task);
1940         }
1941         
1942         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1943         rts_checkSchedStatus("forkProcess",cap);
1944         
1945         rts_unlock(cap);
1946         hs_exit();                      // clean up and exit
1947         stg_exit(EXIT_SUCCESS);
1948     }
1949 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1950     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1951     return -1;
1952 #endif
1953 }
1954
1955 /* ---------------------------------------------------------------------------
1956  * Delete the threads on the run queue of the current capability.
1957  * ------------------------------------------------------------------------- */
1958    
1959 static void
1960 deleteRunQueue (Capability *cap)
1961 {
1962     StgTSO *t, *next;
1963     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
1964         ASSERT(t->what_next != ThreadRelocated);
1965         next = t->link;
1966         deleteThread(cap, t);
1967     }
1968 }
1969
1970 /* startThread and  insertThread are now in GranSim.c -- HWL */
1971
1972
1973 /* -----------------------------------------------------------------------------
1974    Managing the suspended_ccalling_tasks list.
1975    Locks required: sched_mutex
1976    -------------------------------------------------------------------------- */
1977
1978 STATIC_INLINE void
1979 suspendTask (Capability *cap, Task *task)
1980 {
1981     ASSERT(task->next == NULL && task->prev == NULL);
1982     task->next = cap->suspended_ccalling_tasks;
1983     task->prev = NULL;
1984     if (cap->suspended_ccalling_tasks) {
1985         cap->suspended_ccalling_tasks->prev = task;
1986     }
1987     cap->suspended_ccalling_tasks = task;
1988 }
1989
1990 STATIC_INLINE void
1991 recoverSuspendedTask (Capability *cap, Task *task)
1992 {
1993     if (task->prev) {
1994         task->prev->next = task->next;
1995     } else {
1996         ASSERT(cap->suspended_ccalling_tasks == task);
1997         cap->suspended_ccalling_tasks = task->next;
1998     }
1999     if (task->next) {
2000         task->next->prev = task->prev;
2001     }
2002     task->next = task->prev = NULL;
2003 }
2004
2005 /* ---------------------------------------------------------------------------
2006  * Suspending & resuming Haskell threads.
2007  * 
2008  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2009  * its capability before calling the C function.  This allows another
2010  * task to pick up the capability and carry on running Haskell
2011  * threads.  It also means that if the C call blocks, it won't lock
2012  * the whole system.
2013  *
2014  * The Haskell thread making the C call is put to sleep for the
2015  * duration of the call, on the susepended_ccalling_threads queue.  We
2016  * give out a token to the task, which it can use to resume the thread
2017  * on return from the C function.
2018  * ------------------------------------------------------------------------- */
2019    
2020 void *
2021 suspendThread (StgRegTable *reg)
2022 {
2023   Capability *cap;
2024   int saved_errno = errno;
2025   StgTSO *tso;
2026   Task *task;
2027
2028   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2029    */
2030   cap = regTableToCapability(reg);
2031
2032   task = cap->running_task;
2033   tso = cap->r.rCurrentTSO;
2034
2035   IF_DEBUG(scheduler,
2036            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2037
2038   // XXX this might not be necessary --SDM
2039   tso->what_next = ThreadRunGHC;
2040
2041   threadPaused(tso);
2042
2043   if(tso->blocked_exceptions == NULL)  {
2044       tso->why_blocked = BlockedOnCCall;
2045       tso->blocked_exceptions = END_TSO_QUEUE;
2046   } else {
2047       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2048   }
2049
2050   // Hand back capability
2051   task->suspended_tso = tso;
2052
2053   ACQUIRE_LOCK(&cap->lock);
2054
2055   suspendTask(cap,task);
2056   cap->in_haskell = rtsFalse;
2057   releaseCapability_(cap);
2058   
2059   RELEASE_LOCK(&cap->lock);
2060
2061 #if defined(THREADED_RTS)
2062   /* Preparing to leave the RTS, so ensure there's a native thread/task
2063      waiting to take over.
2064   */
2065   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2066 #endif
2067
2068   errno = saved_errno;
2069   return task;
2070 }
2071
2072 StgRegTable *
2073 resumeThread (void *task_)
2074 {
2075     StgTSO *tso;
2076     Capability *cap;
2077     int saved_errno = errno;
2078     Task *task = task_;
2079
2080     cap = task->cap;
2081     // Wait for permission to re-enter the RTS with the result.
2082     waitForReturnCapability(&cap,task);
2083     // we might be on a different capability now... but if so, our
2084     // entry on the suspended_ccalling_tasks list will also have been
2085     // migrated.
2086
2087     // Remove the thread from the suspended list
2088     recoverSuspendedTask(cap,task);
2089
2090     tso = task->suspended_tso;
2091     task->suspended_tso = NULL;
2092     tso->link = END_TSO_QUEUE;
2093     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2094     
2095     if (tso->why_blocked == BlockedOnCCall) {
2096         awakenBlockedQueue(cap,tso->blocked_exceptions);
2097         tso->blocked_exceptions = NULL;
2098     }
2099     
2100     /* Reset blocking status */
2101     tso->why_blocked  = NotBlocked;
2102     
2103     cap->r.rCurrentTSO = tso;
2104     cap->in_haskell = rtsTrue;
2105     errno = saved_errno;
2106
2107     return &cap->r;
2108 }
2109
2110 /* ---------------------------------------------------------------------------
2111  * Comparing Thread ids.
2112  *
2113  * This is used from STG land in the implementation of the
2114  * instances of Eq/Ord for ThreadIds.
2115  * ------------------------------------------------------------------------ */
2116
2117 int
2118 cmp_thread(StgPtr tso1, StgPtr tso2) 
2119
2120   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2121   StgThreadID id2 = ((StgTSO *)tso2)->id;
2122  
2123   if (id1 < id2) return (-1);
2124   if (id1 > id2) return 1;
2125   return 0;
2126 }
2127
2128 /* ---------------------------------------------------------------------------
2129  * Fetching the ThreadID from an StgTSO.
2130  *
2131  * This is used in the implementation of Show for ThreadIds.
2132  * ------------------------------------------------------------------------ */
2133 int
2134 rts_getThreadId(StgPtr tso) 
2135 {
2136   return ((StgTSO *)tso)->id;
2137 }
2138
2139 #ifdef DEBUG
2140 void
2141 labelThread(StgPtr tso, char *label)
2142 {
2143   int len;
2144   void *buf;
2145
2146   /* Caveat: Once set, you can only set the thread name to "" */
2147   len = strlen(label)+1;
2148   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2149   strncpy(buf,label,len);
2150   /* Update will free the old memory for us */
2151   updateThreadLabel(((StgTSO *)tso)->id,buf);
2152 }
2153 #endif /* DEBUG */
2154
2155 /* ---------------------------------------------------------------------------
2156    Create a new thread.
2157
2158    The new thread starts with the given stack size.  Before the
2159    scheduler can run, however, this thread needs to have a closure
2160    (and possibly some arguments) pushed on its stack.  See
2161    pushClosure() in Schedule.h.
2162
2163    createGenThread() and createIOThread() (in SchedAPI.h) are
2164    convenient packaged versions of this function.
2165
2166    currently pri (priority) is only used in a GRAN setup -- HWL
2167    ------------------------------------------------------------------------ */
2168 #if defined(GRAN)
2169 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2170 StgTSO *
2171 createThread(nat size, StgInt pri)
2172 #else
2173 StgTSO *
2174 createThread(Capability *cap, nat size)
2175 #endif
2176 {
2177     StgTSO *tso;
2178     nat stack_size;
2179
2180     /* sched_mutex is *not* required */
2181
2182     /* First check whether we should create a thread at all */
2183 #if defined(PARALLEL_HASKELL)
2184     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2185     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2186         threadsIgnored++;
2187         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2188                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2189         return END_TSO_QUEUE;
2190     }
2191     threadsCreated++;
2192 #endif
2193
2194 #if defined(GRAN)
2195     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2196 #endif
2197
2198     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2199
2200     /* catch ridiculously small stack sizes */
2201     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2202         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2203     }
2204
2205     stack_size = size - TSO_STRUCT_SIZEW;
2206     
2207     tso = (StgTSO *)allocateLocal(cap, size);
2208     TICK_ALLOC_TSO(stack_size, 0);
2209
2210     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2211 #if defined(GRAN)
2212     SET_GRAN_HDR(tso, ThisPE);
2213 #endif
2214
2215     // Always start with the compiled code evaluator
2216     tso->what_next = ThreadRunGHC;
2217
2218     tso->why_blocked  = NotBlocked;
2219     tso->blocked_exceptions = NULL;
2220     
2221     tso->saved_errno = 0;
2222     tso->bound = NULL;
2223     
2224     tso->stack_size     = stack_size;
2225     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2226                           - TSO_STRUCT_SIZEW;
2227     tso->sp             = (P_)&(tso->stack) + stack_size;
2228
2229     tso->trec = NO_TREC;
2230     
2231 #ifdef PROFILING
2232     tso->prof.CCCS = CCS_MAIN;
2233 #endif
2234     
2235   /* put a stop frame on the stack */
2236     tso->sp -= sizeofW(StgStopFrame);
2237     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2238     tso->link = END_TSO_QUEUE;
2239     
2240   // ToDo: check this
2241 #if defined(GRAN)
2242     /* uses more flexible routine in GranSim */
2243     insertThread(tso, CurrentProc);
2244 #else
2245     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2246      * from its creation
2247      */
2248 #endif
2249     
2250 #if defined(GRAN) 
2251     if (RtsFlags.GranFlags.GranSimStats.Full) 
2252         DumpGranEvent(GR_START,tso);
2253 #elif defined(PARALLEL_HASKELL)
2254     if (RtsFlags.ParFlags.ParStats.Full) 
2255         DumpGranEvent(GR_STARTQ,tso);
2256     /* HACk to avoid SCHEDULE 
2257        LastTSO = tso; */
2258 #endif
2259     
2260     /* Link the new thread on the global thread list.
2261      */
2262     ACQUIRE_LOCK(&sched_mutex);
2263     tso->id = next_thread_id++;  // while we have the mutex
2264     tso->global_link = all_threads;
2265     all_threads = tso;
2266     RELEASE_LOCK(&sched_mutex);
2267     
2268 #if defined(DIST)
2269     tso->dist.priority = MandatoryPriority; //by default that is...
2270 #endif
2271     
2272 #if defined(GRAN)
2273     tso->gran.pri = pri;
2274 # if defined(DEBUG)
2275     tso->gran.magic = TSO_MAGIC; // debugging only
2276 # endif
2277     tso->gran.sparkname   = 0;
2278     tso->gran.startedat   = CURRENT_TIME; 
2279     tso->gran.exported    = 0;
2280     tso->gran.basicblocks = 0;
2281     tso->gran.allocs      = 0;
2282     tso->gran.exectime    = 0;
2283     tso->gran.fetchtime   = 0;
2284     tso->gran.fetchcount  = 0;
2285     tso->gran.blocktime   = 0;
2286     tso->gran.blockcount  = 0;
2287     tso->gran.blockedat   = 0;
2288     tso->gran.globalsparks = 0;
2289     tso->gran.localsparks  = 0;
2290     if (RtsFlags.GranFlags.Light)
2291         tso->gran.clock  = Now; /* local clock */
2292     else
2293         tso->gran.clock  = 0;
2294     
2295     IF_DEBUG(gran,printTSO(tso));
2296 #elif defined(PARALLEL_HASKELL)
2297 # if defined(DEBUG)
2298     tso->par.magic = TSO_MAGIC; // debugging only
2299 # endif
2300     tso->par.sparkname   = 0;
2301     tso->par.startedat   = CURRENT_TIME; 
2302     tso->par.exported    = 0;
2303     tso->par.basicblocks = 0;
2304     tso->par.allocs      = 0;
2305     tso->par.exectime    = 0;
2306     tso->par.fetchtime   = 0;
2307     tso->par.fetchcount  = 0;
2308     tso->par.blocktime   = 0;
2309     tso->par.blockcount  = 0;
2310     tso->par.blockedat   = 0;
2311     tso->par.globalsparks = 0;
2312     tso->par.localsparks  = 0;
2313 #endif
2314     
2315 #if defined(GRAN)
2316     globalGranStats.tot_threads_created++;
2317     globalGranStats.threads_created_on_PE[CurrentProc]++;
2318     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2319     globalGranStats.tot_sq_probes++;
2320 #elif defined(PARALLEL_HASKELL)
2321     // collect parallel global statistics (currently done together with GC stats)
2322     if (RtsFlags.ParFlags.ParStats.Global &&
2323         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2324         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2325         globalParStats.tot_threads_created++;
2326     }
2327 #endif 
2328     
2329 #if defined(GRAN)
2330     IF_GRAN_DEBUG(pri,
2331                   sched_belch("==__ schedule: Created TSO %d (%p);",
2332                               CurrentProc, tso, tso->id));
2333 #elif defined(PARALLEL_HASKELL)
2334     IF_PAR_DEBUG(verbose,
2335                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2336                              (long)tso->id, tso, advisory_thread_count));
2337 #else
2338     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2339                                    (long)tso->id, (long)tso->stack_size));
2340 #endif    
2341     return tso;
2342 }
2343
2344 #if defined(PAR)
2345 /* RFP:
2346    all parallel thread creation calls should fall through the following routine.
2347 */
2348 StgTSO *
2349 createThreadFromSpark(rtsSpark spark) 
2350 { StgTSO *tso;
2351   ASSERT(spark != (rtsSpark)NULL);
2352 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2353   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2354   { threadsIgnored++;
2355     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2356           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2357     return END_TSO_QUEUE;
2358   }
2359   else
2360   { threadsCreated++;
2361     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2362     if (tso==END_TSO_QUEUE)     
2363       barf("createSparkThread: Cannot create TSO");
2364 #if defined(DIST)
2365     tso->priority = AdvisoryPriority;
2366 #endif
2367     pushClosure(tso,spark);
2368     addToRunQueue(tso);
2369     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2370   }
2371   return tso;
2372 }
2373 #endif
2374
2375 /*
2376   Turn a spark into a thread.
2377   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2378 */
2379 #if 0
2380 StgTSO *
2381 activateSpark (rtsSpark spark) 
2382 {
2383   StgTSO *tso;
2384
2385   tso = createSparkThread(spark);
2386   if (RtsFlags.ParFlags.ParStats.Full) {   
2387     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2388       IF_PAR_DEBUG(verbose,
2389                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2390                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2391   }
2392   // ToDo: fwd info on local/global spark to thread -- HWL
2393   // tso->gran.exported =  spark->exported;
2394   // tso->gran.locked =   !spark->global;
2395   // tso->gran.sparkname = spark->name;
2396
2397   return tso;
2398 }
2399 #endif
2400
2401 /* ---------------------------------------------------------------------------
2402  * scheduleThread()
2403  *
2404  * scheduleThread puts a thread on the end  of the runnable queue.
2405  * This will usually be done immediately after a thread is created.
2406  * The caller of scheduleThread must create the thread using e.g.
2407  * createThread and push an appropriate closure
2408  * on this thread's stack before the scheduler is invoked.
2409  * ------------------------------------------------------------------------ */
2410
2411 void
2412 scheduleThread(Capability *cap, StgTSO *tso)
2413 {
2414     // The thread goes at the *end* of the run-queue, to avoid possible
2415     // starvation of any threads already on the queue.
2416     appendToRunQueue(cap,tso);
2417 }
2418
2419 Capability *
2420 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2421 {
2422     Task *task;
2423
2424     // We already created/initialised the Task
2425     task = cap->running_task;
2426
2427     // This TSO is now a bound thread; make the Task and TSO
2428     // point to each other.
2429     tso->bound = task;
2430
2431     task->tso = tso;
2432     task->ret = ret;
2433     task->stat = NoStatus;
2434
2435     appendToRunQueue(cap,tso);
2436
2437     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2438
2439 #if defined(GRAN)
2440     /* GranSim specific init */
2441     CurrentTSO = m->tso;                // the TSO to run
2442     procStatus[MainProc] = Busy;        // status of main PE
2443     CurrentProc = MainProc;             // PE to run it on
2444 #endif
2445
2446     cap = schedule(cap,task);
2447
2448     ASSERT(task->stat != NoStatus);
2449
2450     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2451     return cap;
2452 }
2453
2454 /* ----------------------------------------------------------------------------
2455  * Starting Tasks
2456  * ------------------------------------------------------------------------- */
2457
2458 #if defined(THREADED_RTS)
2459 void
2460 workerStart(Task *task)
2461 {
2462     Capability *cap;
2463
2464     // See startWorkerTask().
2465     ACQUIRE_LOCK(&task->lock);
2466     cap = task->cap;
2467     RELEASE_LOCK(&task->lock);
2468
2469     // set the thread-local pointer to the Task:
2470     taskEnter(task);
2471
2472     // schedule() runs without a lock.
2473     cap = schedule(cap,task);
2474
2475     // On exit from schedule(), we have a Capability.
2476     releaseCapability(cap);
2477     taskStop(task);
2478 }
2479 #endif
2480
2481 /* ---------------------------------------------------------------------------
2482  * initScheduler()
2483  *
2484  * Initialise the scheduler.  This resets all the queues - if the
2485  * queues contained any threads, they'll be garbage collected at the
2486  * next pass.
2487  *
2488  * ------------------------------------------------------------------------ */
2489
2490 void 
2491 initScheduler(void)
2492 {
2493 #if defined(GRAN)
2494   nat i;
2495   for (i=0; i<=MAX_PROC; i++) {
2496     run_queue_hds[i]      = END_TSO_QUEUE;
2497     run_queue_tls[i]      = END_TSO_QUEUE;
2498     blocked_queue_hds[i]  = END_TSO_QUEUE;
2499     blocked_queue_tls[i]  = END_TSO_QUEUE;
2500     ccalling_threadss[i]  = END_TSO_QUEUE;
2501     blackhole_queue[i]    = END_TSO_QUEUE;
2502     sleeping_queue        = END_TSO_QUEUE;
2503   }
2504 #elif !defined(THREADED_RTS)
2505   blocked_queue_hd  = END_TSO_QUEUE;
2506   blocked_queue_tl  = END_TSO_QUEUE;
2507   sleeping_queue    = END_TSO_QUEUE;
2508 #endif
2509
2510   blackhole_queue   = END_TSO_QUEUE;
2511   all_threads       = END_TSO_QUEUE;
2512
2513   context_switch = 0;
2514   interrupted    = 0;
2515
2516   RtsFlags.ConcFlags.ctxtSwitchTicks =
2517       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2518       
2519 #if defined(THREADED_RTS)
2520   /* Initialise the mutex and condition variables used by
2521    * the scheduler. */
2522   initMutex(&sched_mutex);
2523 #endif
2524   
2525   ACQUIRE_LOCK(&sched_mutex);
2526
2527   /* A capability holds the state a native thread needs in
2528    * order to execute STG code. At least one capability is
2529    * floating around (only SMP builds have more than one).
2530    */
2531   initCapabilities();
2532
2533   initTaskManager();
2534
2535 #if defined(SMP)
2536   /*
2537    * Eagerly start one worker to run each Capability, except for
2538    * Capability 0.  The idea is that we're probably going to start a
2539    * bound thread on Capability 0 pretty soon, so we don't want a
2540    * worker task hogging it.
2541    */
2542   { 
2543       nat i;
2544       Capability *cap;
2545       for (i = 1; i < n_capabilities; i++) {
2546           cap = &capabilities[i];
2547           ACQUIRE_LOCK(&cap->lock);
2548           startWorkerTask(cap, workerStart);
2549           RELEASE_LOCK(&cap->lock);
2550       }
2551   }
2552 #endif
2553
2554 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2555   initSparkPools();
2556 #endif
2557
2558   RELEASE_LOCK(&sched_mutex);
2559 }
2560
2561 void
2562 exitScheduler( void )
2563 {
2564     interrupted = rtsTrue;
2565     shutting_down_scheduler = rtsTrue;
2566
2567 #if defined(THREADED_RTS)
2568     { 
2569         Task *task;
2570         nat i;
2571         
2572         ACQUIRE_LOCK(&sched_mutex);
2573         task = newBoundTask();
2574         RELEASE_LOCK(&sched_mutex);
2575
2576         for (i = 0; i < n_capabilities; i++) {
2577             shutdownCapability(&capabilities[i], task);
2578         }
2579         boundTaskExiting(task);
2580         stopTaskManager();
2581     }
2582 #endif
2583 }
2584
2585 /* ---------------------------------------------------------------------------
2586    Where are the roots that we know about?
2587
2588         - all the threads on the runnable queue
2589         - all the threads on the blocked queue
2590         - all the threads on the sleeping queue
2591         - all the thread currently executing a _ccall_GC
2592         - all the "main threads"
2593      
2594    ------------------------------------------------------------------------ */
2595
2596 /* This has to be protected either by the scheduler monitor, or by the
2597         garbage collection monitor (probably the latter).
2598         KH @ 25/10/99
2599 */
2600
2601 void
2602 GetRoots( evac_fn evac )
2603 {
2604     nat i;
2605     Capability *cap;
2606     Task *task;
2607
2608 #if defined(GRAN)
2609     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2610         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2611             evac((StgClosure **)&run_queue_hds[i]);
2612         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2613             evac((StgClosure **)&run_queue_tls[i]);
2614         
2615         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2616             evac((StgClosure **)&blocked_queue_hds[i]);
2617         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2618             evac((StgClosure **)&blocked_queue_tls[i]);
2619         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2620             evac((StgClosure **)&ccalling_threads[i]);
2621     }
2622
2623     markEventQueue();
2624
2625 #else /* !GRAN */
2626
2627     for (i = 0; i < n_capabilities; i++) {
2628         cap = &capabilities[i];
2629         evac((StgClosure **)&cap->run_queue_hd);
2630         evac((StgClosure **)&cap->run_queue_tl);
2631         
2632         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2633              task=task->next) {
2634             evac((StgClosure **)&task->suspended_tso);
2635         }
2636     }
2637     
2638 #if !defined(THREADED_RTS)
2639     evac((StgClosure **)&blocked_queue_hd);
2640     evac((StgClosure **)&blocked_queue_tl);
2641     evac((StgClosure **)&sleeping_queue);
2642 #endif 
2643 #endif
2644
2645     evac((StgClosure **)&blackhole_queue);
2646
2647 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2648     markSparkQueue(evac);
2649 #endif
2650     
2651 #if defined(RTS_USER_SIGNALS)
2652     // mark the signal handlers (signals should be already blocked)
2653     markSignalHandlers(evac);
2654 #endif
2655 }
2656
2657 /* -----------------------------------------------------------------------------
2658    performGC
2659
2660    This is the interface to the garbage collector from Haskell land.
2661    We provide this so that external C code can allocate and garbage
2662    collect when called from Haskell via _ccall_GC.
2663
2664    It might be useful to provide an interface whereby the programmer
2665    can specify more roots (ToDo).
2666    
2667    This needs to be protected by the GC condition variable above.  KH.
2668    -------------------------------------------------------------------------- */
2669
2670 static void (*extra_roots)(evac_fn);
2671
2672 void
2673 performGC(void)
2674 {
2675 #ifdef THREADED_RTS
2676     // ToDo: we have to grab all the capabilities here.
2677     errorBelch("performGC not supported in threaded RTS (yet)");
2678     stg_exit(EXIT_FAILURE);
2679 #endif
2680     /* Obligated to hold this lock upon entry */
2681     GarbageCollect(GetRoots,rtsFalse);
2682 }
2683
2684 void
2685 performMajorGC(void)
2686 {
2687 #ifdef THREADED_RTS
2688     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2689     stg_exit(EXIT_FAILURE);
2690 #endif
2691     GarbageCollect(GetRoots,rtsTrue);
2692 }
2693
2694 static void
2695 AllRoots(evac_fn evac)
2696 {
2697     GetRoots(evac);             // the scheduler's roots
2698     extra_roots(evac);          // the user's roots
2699 }
2700
2701 void
2702 performGCWithRoots(void (*get_roots)(evac_fn))
2703 {
2704 #ifdef THREADED_RTS
2705     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2706     stg_exit(EXIT_FAILURE);
2707 #endif
2708     extra_roots = get_roots;
2709     GarbageCollect(AllRoots,rtsFalse);
2710 }
2711
2712 /* -----------------------------------------------------------------------------
2713    Stack overflow
2714
2715    If the thread has reached its maximum stack size, then raise the
2716    StackOverflow exception in the offending thread.  Otherwise
2717    relocate the TSO into a larger chunk of memory and adjust its stack
2718    size appropriately.
2719    -------------------------------------------------------------------------- */
2720
2721 static StgTSO *
2722 threadStackOverflow(Capability *cap, StgTSO *tso)
2723 {
2724   nat new_stack_size, stack_words;
2725   lnat new_tso_size;
2726   StgPtr new_sp;
2727   StgTSO *dest;
2728
2729   IF_DEBUG(sanity,checkTSO(tso));
2730   if (tso->stack_size >= tso->max_stack_size) {
2731
2732     IF_DEBUG(gc,
2733              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2734                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2735              /* If we're debugging, just print out the top of the stack */
2736              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2737                                               tso->sp+64)));
2738
2739     /* Send this thread the StackOverflow exception */
2740     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2741     return tso;
2742   }
2743
2744   /* Try to double the current stack size.  If that takes us over the
2745    * maximum stack size for this thread, then use the maximum instead.
2746    * Finally round up so the TSO ends up as a whole number of blocks.
2747    */
2748   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2749   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2750                                        TSO_STRUCT_SIZE)/sizeof(W_);
2751   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2752   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2753
2754   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
2755
2756   dest = (StgTSO *)allocate(new_tso_size);
2757   TICK_ALLOC_TSO(new_stack_size,0);
2758
2759   /* copy the TSO block and the old stack into the new area */
2760   memcpy(dest,tso,TSO_STRUCT_SIZE);
2761   stack_words = tso->stack + tso->stack_size - tso->sp;
2762   new_sp = (P_)dest + new_tso_size - stack_words;
2763   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2764
2765   /* relocate the stack pointers... */
2766   dest->sp         = new_sp;
2767   dest->stack_size = new_stack_size;
2768         
2769   /* Mark the old TSO as relocated.  We have to check for relocated
2770    * TSOs in the garbage collector and any primops that deal with TSOs.
2771    *
2772    * It's important to set the sp value to just beyond the end
2773    * of the stack, so we don't attempt to scavenge any part of the
2774    * dead TSO's stack.
2775    */
2776   tso->what_next = ThreadRelocated;
2777   tso->link = dest;
2778   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2779   tso->why_blocked = NotBlocked;
2780
2781   IF_PAR_DEBUG(verbose,
2782                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2783                      tso->id, tso, tso->stack_size);
2784                /* If we're debugging, just print out the top of the stack */
2785                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2786                                                 tso->sp+64)));
2787   
2788   IF_DEBUG(sanity,checkTSO(tso));
2789 #if 0
2790   IF_DEBUG(scheduler,printTSO(dest));
2791 #endif
2792
2793   return dest;
2794 }
2795
2796 /* ---------------------------------------------------------------------------
2797    Wake up a queue that was blocked on some resource.
2798    ------------------------------------------------------------------------ */
2799
2800 #if defined(GRAN)
2801 STATIC_INLINE void
2802 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2803 {
2804 }
2805 #elif defined(PARALLEL_HASKELL)
2806 STATIC_INLINE void
2807 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2808 {
2809   /* write RESUME events to log file and
2810      update blocked and fetch time (depending on type of the orig closure) */
2811   if (RtsFlags.ParFlags.ParStats.Full) {
2812     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2813                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2814                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2815     if (emptyRunQueue())
2816       emitSchedule = rtsTrue;
2817
2818     switch (get_itbl(node)->type) {
2819         case FETCH_ME_BQ:
2820           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2821           break;
2822         case RBH:
2823         case FETCH_ME:
2824         case BLACKHOLE_BQ:
2825           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2826           break;
2827 #ifdef DIST
2828         case MVAR:
2829           break;
2830 #endif    
2831         default:
2832           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2833         }
2834       }
2835 }
2836 #endif
2837
2838 #if defined(GRAN)
2839 StgBlockingQueueElement *
2840 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2841 {
2842     StgTSO *tso;
2843     PEs node_loc, tso_loc;
2844
2845     node_loc = where_is(node); // should be lifted out of loop
2846     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2847     tso_loc = where_is((StgClosure *)tso);
2848     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2849       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2850       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2851       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2852       // insertThread(tso, node_loc);
2853       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2854                 ResumeThread,
2855                 tso, node, (rtsSpark*)NULL);
2856       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2857       // len_local++;
2858       // len++;
2859     } else { // TSO is remote (actually should be FMBQ)
2860       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2861                                   RtsFlags.GranFlags.Costs.gunblocktime +
2862                                   RtsFlags.GranFlags.Costs.latency;
2863       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2864                 UnblockThread,
2865                 tso, node, (rtsSpark*)NULL);
2866       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2867       // len++;
2868     }
2869     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2870     IF_GRAN_DEBUG(bq,
2871                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2872                           (node_loc==tso_loc ? "Local" : "Global"), 
2873                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2874     tso->block_info.closure = NULL;
2875     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2876                              tso->id, tso));
2877 }
2878 #elif defined(PARALLEL_HASKELL)
2879 StgBlockingQueueElement *
2880 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2881 {
2882     StgBlockingQueueElement *next;
2883
2884     switch (get_itbl(bqe)->type) {
2885     case TSO:
2886       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2887       /* if it's a TSO just push it onto the run_queue */
2888       next = bqe->link;
2889       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2890       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2891       threadRunnable();
2892       unblockCount(bqe, node);
2893       /* reset blocking status after dumping event */
2894       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2895       break;
2896
2897     case BLOCKED_FETCH:
2898       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2899       next = bqe->link;
2900       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2901       PendingFetches = (StgBlockedFetch *)bqe;
2902       break;
2903
2904 # if defined(DEBUG)
2905       /* can ignore this case in a non-debugging setup; 
2906          see comments on RBHSave closures above */
2907     case CONSTR:
2908       /* check that the closure is an RBHSave closure */
2909       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2910              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2911              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2912       break;
2913
2914     default:
2915       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2916            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2917            (StgClosure *)bqe);
2918 # endif
2919     }
2920   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2921   return next;
2922 }
2923 #endif
2924
2925 StgTSO *
2926 unblockOne(Capability *cap, StgTSO *tso)
2927 {
2928   StgTSO *next;
2929
2930   ASSERT(get_itbl(tso)->type == TSO);
2931   ASSERT(tso->why_blocked != NotBlocked);
2932   tso->why_blocked = NotBlocked;
2933   next = tso->link;
2934   tso->link = END_TSO_QUEUE;
2935
2936   // We might have just migrated this TSO to our Capability:
2937   if (tso->bound) {
2938       tso->bound->cap = cap;
2939   }
2940
2941   appendToRunQueue(cap,tso);
2942
2943   // we're holding a newly woken thread, make sure we context switch
2944   // quickly so we can migrate it if necessary.
2945   context_switch = 1;
2946   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2947   return next;
2948 }
2949
2950
2951 #if defined(GRAN)
2952 void 
2953 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2954 {
2955   StgBlockingQueueElement *bqe;
2956   PEs node_loc;
2957   nat len = 0; 
2958
2959   IF_GRAN_DEBUG(bq, 
2960                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2961                       node, CurrentProc, CurrentTime[CurrentProc], 
2962                       CurrentTSO->id, CurrentTSO));
2963
2964   node_loc = where_is(node);
2965
2966   ASSERT(q == END_BQ_QUEUE ||
2967          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2968          get_itbl(q)->type == CONSTR); // closure (type constructor)
2969   ASSERT(is_unique(node));
2970
2971   /* FAKE FETCH: magically copy the node to the tso's proc;
2972      no Fetch necessary because in reality the node should not have been 
2973      moved to the other PE in the first place
2974   */
2975   if (CurrentProc!=node_loc) {
2976     IF_GRAN_DEBUG(bq, 
2977                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2978                         node, node_loc, CurrentProc, CurrentTSO->id, 
2979                         // CurrentTSO, where_is(CurrentTSO),
2980                         node->header.gran.procs));
2981     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2982     IF_GRAN_DEBUG(bq, 
2983                   debugBelch("## new bitmask of node %p is %#x\n",
2984                         node, node->header.gran.procs));
2985     if (RtsFlags.GranFlags.GranSimStats.Global) {
2986       globalGranStats.tot_fake_fetches++;
2987     }
2988   }
2989
2990   bqe = q;
2991   // ToDo: check: ASSERT(CurrentProc==node_loc);
2992   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2993     //next = bqe->link;
2994     /* 
2995        bqe points to the current element in the queue
2996        next points to the next element in the queue
2997     */
2998     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2999     //tso_loc = where_is(tso);
3000     len++;
3001     bqe = unblockOne(bqe, node);
3002   }
3003
3004   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3005      the closure to make room for the anchor of the BQ */
3006   if (bqe!=END_BQ_QUEUE) {
3007     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3008     /*
3009     ASSERT((info_ptr==&RBH_Save_0_info) ||
3010            (info_ptr==&RBH_Save_1_info) ||
3011            (info_ptr==&RBH_Save_2_info));
3012     */
3013     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3014     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3015     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3016
3017     IF_GRAN_DEBUG(bq,
3018                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3019                         node, info_type(node)));
3020   }
3021
3022   /* statistics gathering */
3023   if (RtsFlags.GranFlags.GranSimStats.Global) {
3024     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3025     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3026     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3027     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3028   }
3029   IF_GRAN_DEBUG(bq,
3030                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3031                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3032 }
3033 #elif defined(PARALLEL_HASKELL)
3034 void 
3035 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3036 {
3037   StgBlockingQueueElement *bqe;
3038
3039   IF_PAR_DEBUG(verbose, 
3040                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3041                      node, mytid));
3042 #ifdef DIST  
3043   //RFP
3044   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3045     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3046     return;
3047   }
3048 #endif
3049   
3050   ASSERT(q == END_BQ_QUEUE ||
3051          get_itbl(q)->type == TSO ||           
3052          get_itbl(q)->type == BLOCKED_FETCH || 
3053          get_itbl(q)->type == CONSTR); 
3054
3055   bqe = q;
3056   while (get_itbl(bqe)->type==TSO || 
3057          get_itbl(bqe)->type==BLOCKED_FETCH) {
3058     bqe = unblockOne(bqe, node);
3059   }
3060 }
3061
3062 #else   /* !GRAN && !PARALLEL_HASKELL */
3063
3064 void
3065 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3066 {
3067     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3068                              // Exception.cmm
3069     while (tso != END_TSO_QUEUE) {
3070         tso = unblockOne(cap,tso);
3071     }
3072 }
3073 #endif
3074
3075 /* ---------------------------------------------------------------------------
3076    Interrupt execution
3077    - usually called inside a signal handler so it mustn't do anything fancy.   
3078    ------------------------------------------------------------------------ */
3079
3080 void
3081 interruptStgRts(void)
3082 {
3083     interrupted    = 1;
3084     context_switch = 1;
3085 #if defined(THREADED_RTS)
3086     prodAllCapabilities();
3087 #endif
3088 }
3089
3090 /* -----------------------------------------------------------------------------
3091    Unblock a thread
3092
3093    This is for use when we raise an exception in another thread, which
3094    may be blocked.
3095    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3096    -------------------------------------------------------------------------- */
3097
3098 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3099 /*
3100   NB: only the type of the blocking queue is different in GranSim and GUM
3101       the operations on the queue-elements are the same
3102       long live polymorphism!
3103
3104   Locks: sched_mutex is held upon entry and exit.
3105
3106 */
3107 static void
3108 unblockThread(Capability *cap, StgTSO *tso)
3109 {
3110   StgBlockingQueueElement *t, **last;
3111
3112   switch (tso->why_blocked) {
3113
3114   case NotBlocked:
3115     return;  /* not blocked */
3116
3117   case BlockedOnSTM:
3118     // Be careful: nothing to do here!  We tell the scheduler that the thread
3119     // is runnable and we leave it to the stack-walking code to abort the 
3120     // transaction while unwinding the stack.  We should perhaps have a debugging
3121     // test to make sure that this really happens and that the 'zombie' transaction
3122     // does not get committed.
3123     goto done;
3124
3125   case BlockedOnMVar:
3126     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3127     {
3128       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3129       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3130
3131       last = (StgBlockingQueueElement **)&mvar->head;
3132       for (t = (StgBlockingQueueElement *)mvar->head; 
3133            t != END_BQ_QUEUE; 
3134            last = &t->link, last_tso = t, t = t->link) {
3135         if (t == (StgBlockingQueueElement *)tso) {
3136           *last = (StgBlockingQueueElement *)tso->link;
3137           if (mvar->tail == tso) {
3138             mvar->tail = (StgTSO *)last_tso;
3139           }
3140           goto done;
3141         }
3142       }
3143       barf("unblockThread (MVAR): TSO not found");
3144     }
3145
3146   case BlockedOnBlackHole:
3147     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3148     {
3149       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3150
3151       last = &bq->blocking_queue;
3152       for (t = bq->blocking_queue; 
3153            t != END_BQ_QUEUE; 
3154            last = &t->link, t = t->link) {
3155         if (t == (StgBlockingQueueElement *)tso) {
3156           *last = (StgBlockingQueueElement *)tso->link;
3157           goto done;
3158         }
3159       }
3160       barf("unblockThread (BLACKHOLE): TSO not found");
3161     }
3162
3163   case BlockedOnException:
3164     {
3165       StgTSO *target  = tso->block_info.tso;
3166
3167       ASSERT(get_itbl(target)->type == TSO);
3168
3169       if (target->what_next == ThreadRelocated) {
3170           target = target->link;
3171           ASSERT(get_itbl(target)->type == TSO);
3172       }
3173
3174       ASSERT(target->blocked_exceptions != NULL);
3175
3176       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3177       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3178            t != END_BQ_QUEUE; 
3179            last = &t->link, t = t->link) {
3180         ASSERT(get_itbl(t)->type == TSO);
3181         if (t == (StgBlockingQueueElement *)tso) {
3182           *last = (StgBlockingQueueElement *)tso->link;
3183           goto done;
3184         }
3185       }
3186       barf("unblockThread (Exception): TSO not found");
3187     }
3188
3189   case BlockedOnRead:
3190   case BlockedOnWrite:
3191 #if defined(mingw32_HOST_OS)
3192   case BlockedOnDoProc:
3193 #endif
3194     {
3195       /* take TSO off blocked_queue */
3196       StgBlockingQueueElement *prev = NULL;
3197       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3198            prev = t, t = t->link) {
3199         if (t == (StgBlockingQueueElement *)tso) {
3200           if (prev == NULL) {
3201             blocked_queue_hd = (StgTSO *)t->link;
3202             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3203               blocked_queue_tl = END_TSO_QUEUE;
3204             }
3205           } else {
3206             prev->link = t->link;
3207             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3208               blocked_queue_tl = (StgTSO *)prev;
3209             }
3210           }
3211 #if defined(mingw32_HOST_OS)
3212           /* (Cooperatively) signal that the worker thread should abort
3213            * the request.
3214            */
3215           abandonWorkRequest(tso->block_info.async_result->reqID);
3216 #endif
3217           goto done;
3218         }
3219       }
3220       barf("unblockThread (I/O): TSO not found");
3221     }
3222
3223   case BlockedOnDelay:
3224     {
3225       /* take TSO off sleeping_queue */
3226       StgBlockingQueueElement *prev = NULL;
3227       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3228            prev = t, t = t->link) {
3229         if (t == (StgBlockingQueueElement *)tso) {
3230           if (prev == NULL) {
3231             sleeping_queue = (StgTSO *)t->link;
3232           } else {
3233             prev->link = t->link;
3234           }
3235           goto done;
3236         }
3237       }
3238       barf("unblockThread (delay): TSO not found");
3239     }
3240
3241   default:
3242     barf("unblockThread");
3243   }
3244
3245  done:
3246   tso->link = END_TSO_QUEUE;
3247   tso->why_blocked = NotBlocked;
3248   tso->block_info.closure = NULL;
3249   pushOnRunQueue(cap,tso);
3250 }
3251 #else
3252 static void
3253 unblockThread(Capability *cap, StgTSO *tso)
3254 {
3255   StgTSO *t, **last;
3256   
3257   /* To avoid locking unnecessarily. */
3258   if (tso->why_blocked == NotBlocked) {
3259     return;
3260   }
3261
3262   switch (tso->why_blocked) {
3263
3264   case BlockedOnSTM:
3265     // Be careful: nothing to do here!  We tell the scheduler that the thread
3266     // is runnable and we leave it to the stack-walking code to abort the 
3267     // transaction while unwinding the stack.  We should perhaps have a debugging
3268     // test to make sure that this really happens and that the 'zombie' transaction
3269     // does not get committed.
3270     goto done;
3271
3272   case BlockedOnMVar:
3273     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3274     {
3275       StgTSO *last_tso = END_TSO_QUEUE;
3276       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3277
3278       last = &mvar->head;
3279       for (t = mvar->head; t != END_TSO_QUEUE; 
3280            last = &t->link, last_tso = t, t = t->link) {
3281         if (t == tso) {
3282           *last = tso->link;
3283           if (mvar->tail == tso) {
3284             mvar->tail = last_tso;
3285           }
3286           goto done;
3287         }
3288       }
3289       barf("unblockThread (MVAR): TSO not found");
3290     }
3291
3292   case BlockedOnBlackHole:
3293     {
3294       last = &blackhole_queue;
3295       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3296            last = &t->link, t = t->link) {
3297         if (t == tso) {
3298           *last = tso->link;
3299           goto done;
3300         }
3301       }
3302       barf("unblockThread (BLACKHOLE): TSO not found");
3303     }
3304
3305   case BlockedOnException:
3306     {
3307       StgTSO *target  = tso->block_info.tso;
3308
3309       ASSERT(get_itbl(target)->type == TSO);
3310
3311       while (target->what_next == ThreadRelocated) {
3312           target = target->link;
3313           ASSERT(get_itbl(target)->type == TSO);
3314       }
3315       
3316       ASSERT(target->blocked_exceptions != NULL);
3317
3318       last = &target->blocked_exceptions;
3319       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3320            last = &t->link, t = t->link) {
3321         ASSERT(get_itbl(t)->type == TSO);
3322         if (t == tso) {
3323           *last = tso->link;
3324           goto done;
3325         }
3326       }
3327       barf("unblockThread (Exception): TSO not found");
3328     }
3329
3330 #if !defined(THREADED_RTS)
3331   case BlockedOnRead:
3332   case BlockedOnWrite:
3333 #if defined(mingw32_HOST_OS)
3334   case BlockedOnDoProc:
3335 #endif
3336     {
3337       StgTSO *prev = NULL;
3338       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3339            prev = t, t = t->link) {
3340         if (t == tso) {
3341           if (prev == NULL) {
3342             blocked_queue_hd = t->link;
3343             if (blocked_queue_tl == t) {
3344               blocked_queue_tl = END_TSO_QUEUE;
3345             }
3346           } else {
3347             prev->link = t->link;
3348             if (blocked_queue_tl == t) {
3349               blocked_queue_tl = prev;
3350             }
3351           }
3352 #if defined(mingw32_HOST_OS)
3353           /* (Cooperatively) signal that the worker thread should abort
3354            * the request.
3355            */
3356           abandonWorkRequest(tso->block_info.async_result->reqID);
3357 #endif
3358           goto done;
3359         }
3360       }
3361       barf("unblockThread (I/O): TSO not found");
3362     }
3363
3364   case BlockedOnDelay:
3365     {
3366       StgTSO *prev = NULL;
3367       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3368            prev = t, t = t->link) {
3369         if (t == tso) {
3370           if (prev == NULL) {
3371             sleeping_queue = t->link;
3372           } else {
3373             prev->link = t->link;
3374           }
3375           goto done;
3376         }
3377       }
3378       barf("unblockThread (delay): TSO not found");
3379     }
3380 #endif
3381
3382   default:
3383     barf("unblockThread");
3384   }
3385
3386  done:
3387   tso->link = END_TSO_QUEUE;
3388   tso->why_blocked = NotBlocked;
3389   tso->block_info.closure = NULL;
3390   appendToRunQueue(cap,tso);
3391 }
3392 #endif
3393
3394 /* -----------------------------------------------------------------------------
3395  * checkBlackHoles()
3396  *
3397  * Check the blackhole_queue for threads that can be woken up.  We do
3398  * this periodically: before every GC, and whenever the run queue is
3399  * empty.
3400  *
3401  * An elegant solution might be to just wake up all the blocked
3402  * threads with awakenBlockedQueue occasionally: they'll go back to
3403  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3404  * doesn't give us a way to tell whether we've actually managed to
3405  * wake up any threads, so we would be busy-waiting.
3406  *
3407  * -------------------------------------------------------------------------- */
3408
3409 static rtsBool
3410 checkBlackHoles (Capability *cap)
3411 {
3412     StgTSO **prev, *t;
3413     rtsBool any_woke_up = rtsFalse;
3414     StgHalfWord type;
3415
3416     // blackhole_queue is global:
3417     ASSERT_LOCK_HELD(&sched_mutex);
3418
3419     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3420
3421     // ASSUMES: sched_mutex
3422     prev = &blackhole_queue;
3423     t = blackhole_queue;
3424     while (t != END_TSO_QUEUE) {
3425         ASSERT(t->why_blocked == BlockedOnBlackHole);
3426         type = get_itbl(t->block_info.closure)->type;
3427         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3428             IF_DEBUG(sanity,checkTSO(t));
3429             t = unblockOne(cap, t);
3430             // urk, the threads migrate to the current capability
3431             // here, but we'd like to keep them on the original one.
3432             *prev = t;
3433             any_woke_up = rtsTrue;
3434         } else {
3435             prev = &t->link;
3436             t = t->link;
3437         }
3438     }
3439
3440     return any_woke_up;
3441 }
3442
3443 /* -----------------------------------------------------------------------------
3444  * raiseAsync()
3445  *
3446  * The following function implements the magic for raising an
3447  * asynchronous exception in an existing thread.
3448  *
3449  * We first remove the thread from any queue on which it might be
3450  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3451  *
3452  * We strip the stack down to the innermost CATCH_FRAME, building
3453  * thunks in the heap for all the active computations, so they can 
3454  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3455  * an application of the handler to the exception, and push it on
3456  * the top of the stack.
3457  * 
3458  * How exactly do we save all the active computations?  We create an
3459  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3460  * AP_STACKs pushes everything from the corresponding update frame
3461  * upwards onto the stack.  (Actually, it pushes everything up to the
3462  * next update frame plus a pointer to the next AP_STACK object.
3463  * Entering the next AP_STACK object pushes more onto the stack until we
3464  * reach the last AP_STACK object - at which point the stack should look
3465  * exactly as it did when we killed the TSO and we can continue
3466  * execution by entering the closure on top of the stack.
3467  *
3468  * We can also kill a thread entirely - this happens if either (a) the 
3469  * exception passed to raiseAsync is NULL, or (b) there's no
3470  * CATCH_FRAME on the stack.  In either case, we strip the entire
3471  * stack and replace the thread with a zombie.
3472  *
3473  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3474  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3475  * the TSO is currently blocked on or on the run queue of.
3476  *
3477  * -------------------------------------------------------------------------- */
3478  
3479 void
3480 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3481 {
3482     raiseAsync_(cap, tso, exception, rtsFalse);
3483 }
3484
3485 static void
3486 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3487             rtsBool stop_at_atomically)
3488 {
3489     StgRetInfoTable *info;
3490     StgPtr sp;
3491   
3492     // Thread already dead?
3493     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3494         return;
3495     }
3496
3497     IF_DEBUG(scheduler, 
3498              sched_belch("raising exception in thread %ld.", (long)tso->id));
3499     
3500     // Remove it from any blocking queues
3501     unblockThread(cap,tso);
3502
3503     sp = tso->sp;
3504     
3505     // The stack freezing code assumes there's a closure pointer on
3506     // the top of the stack, so we have to arrange that this is the case...
3507     //
3508     if (sp[0] == (W_)&stg_enter_info) {
3509         sp++;
3510     } else {
3511         sp--;
3512         sp[0] = (W_)&stg_dummy_ret_closure;
3513     }
3514
3515     while (1) {
3516         nat i;
3517
3518         // 1. Let the top of the stack be the "current closure"
3519         //
3520         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3521         // CATCH_FRAME.
3522         //
3523         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3524         // current closure applied to the chunk of stack up to (but not
3525         // including) the update frame.  This closure becomes the "current
3526         // closure".  Go back to step 2.
3527         //
3528         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3529         // top of the stack applied to the exception.
3530         // 
3531         // 5. If it's a STOP_FRAME, then kill the thread.
3532         // 
3533         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3534         // transaction
3535        
3536         
3537         StgPtr frame;
3538         
3539         frame = sp + 1;
3540         info = get_ret_itbl((StgClosure *)frame);
3541         
3542         while (info->i.type != UPDATE_FRAME
3543                && (info->i.type != CATCH_FRAME || exception == NULL)
3544                && info->i.type != STOP_FRAME
3545                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3546         {
3547             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3548               // IF we find an ATOMICALLY_FRAME then we abort the
3549               // current transaction and propagate the exception.  In
3550               // this case (unlike ordinary exceptions) we do not care
3551               // whether the transaction is valid or not because its
3552               // possible validity cannot have caused the exception
3553               // and will not be visible after the abort.
3554               IF_DEBUG(stm,
3555                        debugBelch("Found atomically block delivering async exception\n"));
3556               stmAbortTransaction(tso -> trec);
3557               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3558             }
3559             frame += stack_frame_sizeW((StgClosure *)frame);
3560             info = get_ret_itbl((StgClosure *)frame);
3561         }
3562         
3563         switch (info->i.type) {
3564             
3565         case ATOMICALLY_FRAME:
3566             ASSERT(stop_at_atomically);
3567             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3568             stmCondemnTransaction(tso -> trec);
3569 #ifdef REG_R1
3570             tso->sp = frame;
3571 #else
3572             // R1 is not a register: the return convention for IO in
3573             // this case puts the return value on the stack, so we
3574             // need to set up the stack to return to the atomically
3575             // frame properly...
3576             tso->sp = frame - 2;
3577             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3578             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3579 #endif
3580             tso->what_next = ThreadRunGHC;
3581             return;
3582
3583         case CATCH_FRAME:
3584             // If we find a CATCH_FRAME, and we've got an exception to raise,
3585             // then build the THUNK raise(exception), and leave it on
3586             // top of the CATCH_FRAME ready to enter.
3587             //
3588         {
3589 #ifdef PROFILING
3590             StgCatchFrame *cf = (StgCatchFrame *)frame;
3591 #endif
3592             StgThunk *raise;
3593             
3594             // we've got an exception to raise, so let's pass it to the
3595             // handler in this frame.
3596             //
3597             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3598             TICK_ALLOC_SE_THK(1,0);
3599             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3600             raise->payload[0] = exception;
3601             
3602             // throw away the stack from Sp up to the CATCH_FRAME.
3603             //
3604             sp = frame - 1;
3605             
3606             /* Ensure that async excpetions are blocked now, so we don't get
3607              * a surprise exception before we get around to executing the
3608              * handler.
3609              */
3610             if (tso->blocked_exceptions == NULL) {
3611                 tso->blocked_exceptions = END_TSO_QUEUE;
3612             }
3613             
3614             /* Put the newly-built THUNK on top of the stack, ready to execute
3615              * when the thread restarts.
3616              */
3617             sp[0] = (W_)raise;
3618             sp[-1] = (W_)&stg_enter_info;
3619             tso->sp = sp-1;
3620             tso->what_next = ThreadRunGHC;
3621             IF_DEBUG(sanity, checkTSO(tso));
3622             return;
3623         }
3624         
3625         case UPDATE_FRAME:
3626         {
3627             StgAP_STACK * ap;
3628             nat words;
3629             
3630             // First build an AP_STACK consisting of the stack chunk above the
3631             // current update frame, with the top word on the stack as the
3632             // fun field.
3633             //
3634             words = frame - sp - 1;
3635             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3636             
3637             ap->size = words;
3638             ap->fun  = (StgClosure *)sp[0];
3639             sp++;
3640             for(i=0; i < (nat)words; ++i) {
3641                 ap->payload[i] = (StgClosure *)*sp++;
3642             }
3643             
3644             SET_HDR(ap,&stg_AP_STACK_info,
3645                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3646             TICK_ALLOC_UP_THK(words+1,0);
3647             
3648             IF_DEBUG(scheduler,
3649                      debugBelch("sched: Updating ");
3650                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3651                      debugBelch(" with ");
3652                      printObj((StgClosure *)ap);
3653                 );
3654
3655             // Replace the updatee with an indirection - happily
3656             // this will also wake up any threads currently
3657             // waiting on the result.
3658             //
3659             // Warning: if we're in a loop, more than one update frame on
3660             // the stack may point to the same object.  Be careful not to
3661             // overwrite an IND_OLDGEN in this case, because we'll screw
3662             // up the mutable lists.  To be on the safe side, don't
3663             // overwrite any kind of indirection at all.  See also
3664             // threadSqueezeStack in GC.c, where we have to make a similar
3665             // check.
3666             //
3667             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3668                 // revert the black hole
3669                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3670                                (StgClosure *)ap);
3671             }
3672             sp += sizeofW(StgUpdateFrame) - 1;
3673             sp[0] = (W_)ap; // push onto stack
3674             break;
3675         }
3676         
3677         case STOP_FRAME:
3678             // We've stripped the entire stack, the thread is now dead.
3679             sp += sizeofW(StgStopFrame);
3680             tso->what_next = ThreadKilled;
3681             tso->sp = sp;
3682             return;
3683             
3684         default:
3685             barf("raiseAsync");
3686         }
3687     }
3688     barf("raiseAsync");
3689 }
3690
3691 /* -----------------------------------------------------------------------------
3692    Deleting threads
3693
3694    This is used for interruption (^C) and forking, and corresponds to
3695    raising an exception but without letting the thread catch the
3696    exception.
3697    -------------------------------------------------------------------------- */
3698
3699 static void 
3700 deleteThread (Capability *cap, StgTSO *tso)
3701 {
3702   if (tso->why_blocked != BlockedOnCCall &&
3703       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3704       raiseAsync(cap,tso,NULL);
3705   }
3706 }
3707
3708 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3709 static void 
3710 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3711 { // for forkProcess only:
3712   // delete thread without giving it a chance to catch the KillThread exception
3713
3714   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3715       return;
3716   }
3717
3718   if (tso->why_blocked != BlockedOnCCall &&
3719       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3720       unblockThread(cap,tso);
3721   }
3722
3723   tso->what_next = ThreadKilled;
3724 }
3725 #endif
3726
3727 /* -----------------------------------------------------------------------------
3728    raiseExceptionHelper
3729    
3730    This function is called by the raise# primitve, just so that we can
3731    move some of the tricky bits of raising an exception from C-- into
3732    C.  Who knows, it might be a useful re-useable thing here too.
3733    -------------------------------------------------------------------------- */
3734
3735 StgWord
3736 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3737 {
3738     Capability *cap = regTableToCapability(reg);
3739     StgThunk *raise_closure = NULL;
3740     StgPtr p, next;
3741     StgRetInfoTable *info;
3742     //
3743     // This closure represents the expression 'raise# E' where E
3744     // is the exception raise.  It is used to overwrite all the
3745     // thunks which are currently under evaluataion.
3746     //
3747
3748     //    
3749     // LDV profiling: stg_raise_info has THUNK as its closure
3750     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3751     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3752     // 1 does not cause any problem unless profiling is performed.
3753     // However, when LDV profiling goes on, we need to linearly scan
3754     // small object pool, where raise_closure is stored, so we should
3755     // use MIN_UPD_SIZE.
3756     //
3757     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3758     //                                 sizeofW(StgClosure)+1);
3759     //
3760
3761     //
3762     // Walk up the stack, looking for the catch frame.  On the way,
3763     // we update any closures pointed to from update frames with the
3764     // raise closure that we just built.
3765     //
3766     p = tso->sp;
3767     while(1) {
3768         info = get_ret_itbl((StgClosure *)p);
3769         next = p + stack_frame_sizeW((StgClosure *)p);
3770         switch (info->i.type) {
3771             
3772         case UPDATE_FRAME:
3773             // Only create raise_closure if we need to.
3774             if (raise_closure == NULL) {
3775                 raise_closure = 
3776                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3777                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3778                 raise_closure->payload[0] = exception;
3779             }
3780             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3781             p = next;
3782             continue;
3783
3784         case ATOMICALLY_FRAME:
3785             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3786             tso->sp = p;
3787             return ATOMICALLY_FRAME;
3788             
3789         case CATCH_FRAME:
3790             tso->sp = p;
3791             return CATCH_FRAME;
3792
3793         case CATCH_STM_FRAME:
3794             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3795             tso->sp = p;
3796             return CATCH_STM_FRAME;
3797             
3798         case STOP_FRAME:
3799             tso->sp = p;
3800             return STOP_FRAME;
3801
3802         case CATCH_RETRY_FRAME:
3803         default:
3804             p = next; 
3805             continue;
3806         }
3807     }
3808 }
3809
3810
3811 /* -----------------------------------------------------------------------------
3812    findRetryFrameHelper
3813
3814    This function is called by the retry# primitive.  It traverses the stack
3815    leaving tso->sp referring to the frame which should handle the retry.  
3816
3817    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3818    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3819
3820    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3821    despite the similar implementation.
3822
3823    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3824    not be created within memory transactions.
3825    -------------------------------------------------------------------------- */
3826
3827 StgWord
3828 findRetryFrameHelper (StgTSO *tso)
3829 {
3830   StgPtr           p, next;
3831   StgRetInfoTable *info;
3832
3833   p = tso -> sp;
3834   while (1) {
3835     info = get_ret_itbl((StgClosure *)p);
3836     next = p + stack_frame_sizeW((StgClosure *)p);
3837     switch (info->i.type) {
3838       
3839     case ATOMICALLY_FRAME:
3840       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3841       tso->sp = p;
3842       return ATOMICALLY_FRAME;
3843       
3844     case CATCH_RETRY_FRAME:
3845       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3846       tso->sp = p;
3847       return CATCH_RETRY_FRAME;
3848       
3849     case CATCH_STM_FRAME:
3850     default:
3851       ASSERT(info->i.type != CATCH_FRAME);
3852       ASSERT(info->i.type != STOP_FRAME);
3853       p = next; 
3854       continue;
3855     }
3856   }
3857 }
3858
3859 /* -----------------------------------------------------------------------------
3860    resurrectThreads is called after garbage collection on the list of
3861    threads found to be garbage.  Each of these threads will be woken
3862    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3863    on an MVar, or NonTermination if the thread was blocked on a Black
3864    Hole.
3865
3866    Locks: assumes we hold *all* the capabilities.
3867    -------------------------------------------------------------------------- */
3868
3869 void
3870 resurrectThreads (StgTSO *threads)
3871 {
3872     StgTSO *tso, *next;
3873     Capability *cap;
3874
3875     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3876         next = tso->global_link;
3877         tso->global_link = all_threads;
3878         all_threads = tso;
3879         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3880         
3881         // Wake up the thread on the Capability it was last on for a
3882         // bound thread, or last_free_capability otherwise.
3883         if (tso->bound) {
3884             cap = tso->bound->cap;
3885         } else {
3886             cap = last_free_capability;
3887         }
3888         
3889         switch (tso->why_blocked) {
3890         case BlockedOnMVar:
3891         case BlockedOnException:
3892             /* Called by GC - sched_mutex lock is currently held. */
3893             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
3894             break;
3895         case BlockedOnBlackHole:
3896             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
3897             break;
3898         case BlockedOnSTM:
3899             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
3900             break;
3901         case NotBlocked:
3902             /* This might happen if the thread was blocked on a black hole
3903              * belonging to a thread that we've just woken up (raiseAsync
3904              * can wake up threads, remember...).
3905              */
3906             continue;
3907         default:
3908             barf("resurrectThreads: thread blocked in a strange way");
3909         }
3910     }
3911 }
3912
3913 /* ----------------------------------------------------------------------------
3914  * Debugging: why is a thread blocked
3915  * [Also provides useful information when debugging threaded programs
3916  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3917    ------------------------------------------------------------------------- */
3918
3919 #if DEBUG
3920 static void
3921 printThreadBlockage(StgTSO *tso)
3922 {
3923   switch (tso->why_blocked) {
3924   case BlockedOnRead:
3925     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
3926     break;
3927   case BlockedOnWrite:
3928     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
3929     break;
3930 #if defined(mingw32_HOST_OS)
3931     case BlockedOnDoProc:
3932     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
3933     break;
3934 #endif
3935   case BlockedOnDelay:
3936     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
3937     break;
3938   case BlockedOnMVar:
3939     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
3940     break;
3941   case BlockedOnException:
3942     debugBelch("is blocked on delivering an exception to thread %d",
3943             tso->block_info.tso->id);
3944     break;
3945   case BlockedOnBlackHole:
3946     debugBelch("is blocked on a black hole");
3947     break;
3948   case NotBlocked:
3949     debugBelch("is not blocked");
3950     break;
3951 #if defined(PARALLEL_HASKELL)
3952   case BlockedOnGA:
3953     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3954             tso->block_info.closure, info_type(tso->block_info.closure));
3955     break;
3956   case BlockedOnGA_NoSend:
3957     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3958             tso->block_info.closure, info_type(tso->block_info.closure));
3959     break;
3960 #endif
3961   case BlockedOnCCall:
3962     debugBelch("is blocked on an external call");
3963     break;
3964   case BlockedOnCCall_NoUnblockExc:
3965     debugBelch("is blocked on an external call (exceptions were already blocked)");
3966     break;
3967   case BlockedOnSTM:
3968     debugBelch("is blocked on an STM operation");
3969     break;
3970   default:
3971     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3972          tso->why_blocked, tso->id, tso);
3973   }
3974 }
3975
3976 static void
3977 printThreadStatus(StgTSO *tso)
3978 {
3979   switch (tso->what_next) {
3980   case ThreadKilled:
3981     debugBelch("has been killed");
3982     break;
3983   case ThreadComplete:
3984     debugBelch("has completed");
3985     break;
3986   default:
3987     printThreadBlockage(tso);
3988   }
3989 }
3990
3991 void
3992 printAllThreads(void)
3993 {
3994   StgTSO *t;
3995
3996 # if defined(GRAN)
3997   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3998   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3999                        time_string, rtsFalse/*no commas!*/);
4000
4001   debugBelch("all threads at [%s]:\n", time_string);
4002 # elif defined(PARALLEL_HASKELL)
4003   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4004   ullong_format_string(CURRENT_TIME,
4005                        time_string, rtsFalse/*no commas!*/);
4006
4007   debugBelch("all threads at [%s]:\n", time_string);
4008 # else
4009   debugBelch("all threads:\n");
4010 # endif
4011
4012   for (t = all_threads; t != END_TSO_QUEUE; ) {
4013     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4014     {
4015       void *label = lookupThreadLabel(t->id);
4016       if (label) debugBelch("[\"%s\"] ",(char *)label);
4017     }
4018     if (t->what_next == ThreadRelocated) {
4019         debugBelch("has been relocated...\n");
4020         t = t->link;
4021     } else {
4022         printThreadStatus(t);
4023         debugBelch("\n");
4024         t = t->global_link;
4025     }
4026   }
4027 }
4028
4029 // useful from gdb
4030 void 
4031 printThreadQueue(StgTSO *t)
4032 {
4033     nat i = 0;
4034     for (; t != END_TSO_QUEUE; t = t->link) {
4035         debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4036         if (t->what_next == ThreadRelocated) {
4037             debugBelch("has been relocated...\n");
4038         } else {
4039             printThreadStatus(t);
4040             debugBelch("\n");
4041         }
4042         i++;
4043     }
4044     debugBelch("%d threads on queue\n", i);
4045 }
4046
4047 /* 
4048    Print a whole blocking queue attached to node (debugging only).
4049 */
4050 # if defined(PARALLEL_HASKELL)
4051 void 
4052 print_bq (StgClosure *node)
4053 {
4054   StgBlockingQueueElement *bqe;
4055   StgTSO *tso;
4056   rtsBool end;
4057
4058   debugBelch("## BQ of closure %p (%s): ",
4059           node, info_type(node));
4060
4061   /* should cover all closures that may have a blocking queue */
4062   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4063          get_itbl(node)->type == FETCH_ME_BQ ||
4064          get_itbl(node)->type == RBH ||
4065          get_itbl(node)->type == MVAR);
4066     
4067   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4068
4069   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4070 }
4071
4072 /* 
4073    Print a whole blocking queue starting with the element bqe.
4074 */
4075 void 
4076 print_bqe (StgBlockingQueueElement *bqe)
4077 {
4078   rtsBool end;
4079
4080   /* 
4081      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4082   */
4083   for (end = (bqe==END_BQ_QUEUE);
4084        !end; // iterate until bqe points to a CONSTR
4085        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4086        bqe = end ? END_BQ_QUEUE : bqe->link) {
4087     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4088     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4089     /* types of closures that may appear in a blocking queue */
4090     ASSERT(get_itbl(bqe)->type == TSO ||           
4091            get_itbl(bqe)->type == BLOCKED_FETCH || 
4092            get_itbl(bqe)->type == CONSTR); 
4093     /* only BQs of an RBH end with an RBH_Save closure */
4094     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4095
4096     switch (get_itbl(bqe)->type) {
4097     case TSO:
4098       debugBelch(" TSO %u (%x),",
4099               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4100       break;
4101     case BLOCKED_FETCH:
4102       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4103               ((StgBlockedFetch *)bqe)->node, 
4104               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4105               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4106               ((StgBlockedFetch *)bqe)->ga.weight);
4107       break;
4108     case CONSTR:
4109       debugBelch(" %s (IP %p),",
4110               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4111                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4112                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4113                "RBH_Save_?"), get_itbl(bqe));
4114       break;
4115     default:
4116       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4117            info_type((StgClosure *)bqe)); // , node, info_type(node));
4118       break;
4119     }
4120   } /* for */
4121   debugBelch("\n");
4122 }
4123 # elif defined(GRAN)
4124 void 
4125 print_bq (StgClosure *node)
4126 {
4127   StgBlockingQueueElement *bqe;
4128   PEs node_loc, tso_loc;
4129   rtsBool end;
4130
4131   /* should cover all closures that may have a blocking queue */
4132   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4133          get_itbl(node)->type == FETCH_ME_BQ ||
4134          get_itbl(node)->type == RBH);
4135     
4136   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4137   node_loc = where_is(node);
4138
4139   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4140           node, info_type(node), node_loc);
4141
4142   /* 
4143      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4144   */
4145   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4146        !end; // iterate until bqe points to a CONSTR
4147        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4148     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4149     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4150     /* types of closures that may appear in a blocking queue */
4151     ASSERT(get_itbl(bqe)->type == TSO ||           
4152            get_itbl(bqe)->type == CONSTR); 
4153     /* only BQs of an RBH end with an RBH_Save closure */
4154     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4155
4156     tso_loc = where_is((StgClosure *)bqe);
4157     switch (get_itbl(bqe)->type) {
4158     case TSO:
4159       debugBelch(" TSO %d (%p) on [PE %d],",
4160               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4161       break;
4162     case CONSTR:
4163       debugBelch(" %s (IP %p),",
4164               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4165                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4166                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4167                "RBH_Save_?"), get_itbl(bqe));
4168       break;
4169     default:
4170       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4171            info_type((StgClosure *)bqe), node, info_type(node));
4172       break;
4173     }
4174   } /* for */
4175   debugBelch("\n");
4176 }
4177 # endif
4178
4179 #if defined(PARALLEL_HASKELL)
4180 static nat
4181 run_queue_len(void)
4182 {
4183     nat i;
4184     StgTSO *tso;
4185     
4186     for (i=0, tso=run_queue_hd; 
4187          tso != END_TSO_QUEUE;
4188          i++, tso=tso->link) {
4189         /* nothing */
4190     }
4191         
4192     return i;
4193 }
4194 #endif
4195
4196 void
4197 sched_belch(char *s, ...)
4198 {
4199     va_list ap;
4200     va_start(ap,s);
4201 #ifdef THREADED_RTS
4202     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4203 #elif defined(PARALLEL_HASKELL)
4204     debugBelch("== ");
4205 #else
4206     debugBelch("sched: ");
4207 #endif
4208     vdebugBelch(s, ap);
4209     debugBelch("\n");
4210     va_end(ap);
4211 }
4212
4213 #endif /* DEBUG */