7af3ab560f441fe38f81c01b9e75ad210a9b614d
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 #ifdef THREADED_RTS
73 #define USED_WHEN_THREADED_RTS
74 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
75 #else
76 #define USED_WHEN_THREADED_RTS     STG_UNUSED
77 #define USED_WHEN_NON_THREADED_RTS
78 #endif
79
80 #ifdef SMP
81 #define USED_WHEN_SMP
82 #else
83 #define USED_WHEN_SMP STG_UNUSED
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Global variables
88  * -------------------------------------------------------------------------- */
89
90 #if defined(GRAN)
91
92 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
93 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
94
95 /* 
96    In GranSim we have a runnable and a blocked queue for each processor.
97    In order to minimise code changes new arrays run_queue_hds/tls
98    are created. run_queue_hd is then a short cut (macro) for
99    run_queue_hds[CurrentProc] (see GranSim.h).
100    -- HWL
101 */
102 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
103 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
104 StgTSO *ccalling_threadss[MAX_PROC];
105 /* We use the same global list of threads (all_threads) in GranSim as in
106    the std RTS (i.e. we are cheating). However, we don't use this list in
107    the GranSim specific code at the moment (so we are only potentially
108    cheating).  */
109
110 #else /* !GRAN */
111
112 #if !defined(THREADED_RTS)
113 // Blocked/sleeping thrads
114 StgTSO *blocked_queue_hd = NULL;
115 StgTSO *blocked_queue_tl = NULL;
116 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
117 #endif
118
119 /* Threads blocked on blackholes.
120  * LOCK: sched_mutex+capability, or all capabilities
121  */
122 StgTSO *blackhole_queue = NULL;
123 #endif
124
125 /* The blackhole_queue should be checked for threads to wake up.  See
126  * Schedule.h for more thorough comment.
127  * LOCK: none (doesn't matter if we miss an update)
128  */
129 rtsBool blackholes_need_checking = rtsFalse;
130
131 /* Linked list of all threads.
132  * Used for detecting garbage collected threads.
133  * LOCK: sched_mutex+capability, or all capabilities
134  */
135 StgTSO *all_threads = NULL;
136
137 /* flag set by signal handler to precipitate a context switch
138  * LOCK: none (just an advisory flag)
139  */
140 int context_switch = 0;
141
142 /* flag that tracks whether we have done any execution in this time slice.
143  * LOCK: currently none, perhaps we should lock (but needs to be
144  * updated in the fast path of the scheduler).
145  */
146 nat recent_activity = ACTIVITY_YES;
147
148 /* if this flag is set as well, give up execution
149  * LOCK: none (changes once, from false->true)
150  */
151 rtsBool interrupted = rtsFalse;
152
153 /* Next thread ID to allocate.
154  * LOCK: sched_mutex
155  */
156 static StgThreadID next_thread_id = 1;
157
158 /* The smallest stack size that makes any sense is:
159  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
160  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
161  *  + 1                       (the closure to enter)
162  *  + 1                       (stg_ap_v_ret)
163  *  + 1                       (spare slot req'd by stg_ap_v_ret)
164  *
165  * A thread with this stack will bomb immediately with a stack
166  * overflow, which will increase its stack size.  
167  */
168 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
169
170 #if defined(GRAN)
171 StgTSO *CurrentTSO;
172 #endif
173
174 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
175  *  exists - earlier gccs apparently didn't.
176  *  -= chak
177  */
178 StgTSO dummy_tso;
179
180 /*
181  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
182  * in an MT setting, needed to signal that a worker thread shouldn't hang around
183  * in the scheduler when it is out of work.
184  */
185 rtsBool shutting_down_scheduler = rtsFalse;
186
187 /*
188  * This mutex protects most of the global scheduler data in
189  * the THREADED_RTS and (inc. SMP) runtime.
190  */
191 #if defined(THREADED_RTS)
192 Mutex sched_mutex = INIT_MUTEX_VAR;
193 #endif
194
195 #if defined(PARALLEL_HASKELL)
196 StgTSO *LastTSO;
197 rtsTime TimeOfLastYield;
198 rtsBool emitSchedule = rtsTrue;
199 #endif
200
201 /* -----------------------------------------------------------------------------
202  * static function prototypes
203  * -------------------------------------------------------------------------- */
204
205 static Capability *schedule (Capability *initialCapability, Task *task);
206
207 //
208 // These function all encapsulate parts of the scheduler loop, and are
209 // abstracted only to make the structure and control flow of the
210 // scheduler clearer.
211 //
212 static void schedulePreLoop (void);
213 static void scheduleStartSignalHandlers (void);
214 static void scheduleCheckBlockedThreads (Capability *cap);
215 static void scheduleCheckBlackHoles (Capability *cap);
216 static void scheduleDetectDeadlock (Capability *cap, Task *task);
217 #if defined(GRAN)
218 static StgTSO *scheduleProcessEvent(rtsEvent *event);
219 #endif
220 #if defined(PARALLEL_HASKELL)
221 static StgTSO *scheduleSendPendingMessages(void);
222 static void scheduleActivateSpark(void);
223 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
224 #endif
225 #if defined(PAR) || defined(GRAN)
226 static void scheduleGranParReport(void);
227 #endif
228 static void schedulePostRunThread(void);
229 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
230 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
231                                          StgTSO *t);
232 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
233                                     nat prev_what_next );
234 static void scheduleHandleThreadBlocked( StgTSO *t );
235 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
236                                              StgTSO *t );
237 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
238 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
239
240 static void unblockThread(Capability *cap, StgTSO *tso);
241 static rtsBool checkBlackHoles(Capability *cap);
242 static void AllRoots(evac_fn evac);
243
244 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
245
246 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
247                         rtsBool stop_at_atomically);
248
249 static void deleteThread (Capability *cap, StgTSO *tso);
250 static void deleteRunQueue (Capability *cap);
251
252 #ifdef DEBUG
253 static void printThreadBlockage(StgTSO *tso);
254 static void printThreadStatus(StgTSO *tso);
255 void printThreadQueue(StgTSO *tso);
256 #endif
257
258 #if defined(PARALLEL_HASKELL)
259 StgTSO * createSparkThread(rtsSpark spark);
260 StgTSO * activateSpark (rtsSpark spark);  
261 #endif
262
263 #ifdef DEBUG
264 static char *whatNext_strs[] = {
265   "(unknown)",
266   "ThreadRunGHC",
267   "ThreadInterpret",
268   "ThreadKilled",
269   "ThreadRelocated",
270   "ThreadComplete"
271 };
272 #endif
273
274 /* -----------------------------------------------------------------------------
275  * Putting a thread on the run queue: different scheduling policies
276  * -------------------------------------------------------------------------- */
277
278 STATIC_INLINE void
279 addToRunQueue( Capability *cap, StgTSO *t )
280 {
281 #if defined(PARALLEL_HASKELL)
282     if (RtsFlags.ParFlags.doFairScheduling) { 
283         // this does round-robin scheduling; good for concurrency
284         appendToRunQueue(cap,t);
285     } else {
286         // this does unfair scheduling; good for parallelism
287         pushOnRunQueue(cap,t);
288     }
289 #else
290     // this does round-robin scheduling; good for concurrency
291     appendToRunQueue(cap,t);
292 #endif
293 }
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323
324 static Capability *
325 schedule (Capability *initialCapability, Task *task)
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PARALLEL_HASKELL)
333   StgTSO *tso;
334   GlobalTaskId pe;
335   rtsBool receivedFinish = rtsFalse;
336 # if defined(DEBUG)
337   nat tp_size, sp_size; // stats only
338 # endif
339 #endif
340   nat prev_what_next;
341   rtsBool ready_to_gc;
342   rtsBool first = rtsTrue;
343   
344   cap = initialCapability;
345
346   // Pre-condition: this task owns initialCapability.
347   // The sched_mutex is *NOT* held
348   // NB. on return, we still hold a capability.
349
350   IF_DEBUG(scheduler,
351            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
352                        task, initialCapability);
353       );
354
355   schedulePreLoop();
356
357   // -----------------------------------------------------------
358   // Scheduler loop starts here:
359
360 #if defined(PARALLEL_HASKELL)
361 #define TERMINATION_CONDITION        (!receivedFinish)
362 #elif defined(GRAN)
363 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
364 #else
365 #define TERMINATION_CONDITION        rtsTrue
366 #endif
367
368   while (TERMINATION_CONDITION) {
369
370 #if defined(GRAN)
371       /* Choose the processor with the next event */
372       CurrentProc = event->proc;
373       CurrentTSO = event->tso;
374 #endif
375
376 #if defined(THREADED_RTS)
377       if (first) {
378           // don't yield the first time, we want a chance to run this
379           // thread for a bit, even if there are others banging at the
380           // door.
381           first = rtsFalse;
382       } else {
383           // Yield the capability to higher-priority tasks if necessary.
384           yieldCapability(&cap, task);
385       }
386 #endif
387       
388       ASSERT(cap->running_task == task);
389       ASSERT(task->cap == cap);
390       ASSERT(myTask() == task);
391
392     // Check whether we have re-entered the RTS from Haskell without
393     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
394     // call).
395     if (cap->in_haskell) {
396           errorBelch("schedule: re-entered unsafely.\n"
397                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
398           stg_exit(EXIT_FAILURE);
399     }
400
401     //
402     // Test for interruption.  If interrupted==rtsTrue, then either
403     // we received a keyboard interrupt (^C), or the scheduler is
404     // trying to shut down all the tasks (shutting_down_scheduler) in
405     // the threaded RTS.
406     //
407     if (interrupted) {
408         deleteRunQueue(cap);
409         if (shutting_down_scheduler) {
410             IF_DEBUG(scheduler, sched_belch("shutting down"));
411             // If we are a worker, just exit.  If we're a bound thread
412             // then we will exit below when we've removed our TSO from
413             // the run queue.
414             if (task->tso == NULL) {
415                 return cap;
416             }
417         } else {
418             IF_DEBUG(scheduler, sched_belch("interrupted"));
419         }
420     }
421
422 #if defined(not_yet) && defined(SMP)
423     //
424     // Top up the run queue from our spark pool.  We try to make the
425     // number of threads in the run queue equal to the number of
426     // free capabilities.
427     //
428     {
429         StgClosure *spark;
430         if (emptyRunQueue()) {
431             spark = findSpark(rtsFalse);
432             if (spark == NULL) {
433                 break; /* no more sparks in the pool */
434             } else {
435                 createSparkThread(spark);         
436                 IF_DEBUG(scheduler,
437                          sched_belch("==^^ turning spark of closure %p into a thread",
438                                      (StgClosure *)spark));
439             }
440         }
441     }
442 #endif // SMP
443
444     scheduleStartSignalHandlers();
445
446     // Only check the black holes here if we've nothing else to do.
447     // During normal execution, the black hole list only gets checked
448     // at GC time, to avoid repeatedly traversing this possibly long
449     // list each time around the scheduler.
450     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
451
452     scheduleCheckBlockedThreads(cap);
453
454     scheduleDetectDeadlock(cap,task);
455
456     // Normally, the only way we can get here with no threads to
457     // run is if a keyboard interrupt received during 
458     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
459     // Additionally, it is not fatal for the
460     // threaded RTS to reach here with no threads to run.
461     //
462     // win32: might be here due to awaitEvent() being abandoned
463     // as a result of a console event having been delivered.
464     if ( emptyRunQueue(cap) ) {
465 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
466         ASSERT(interrupted);
467 #endif
468         continue; // nothing to do
469     }
470
471 #if defined(PARALLEL_HASKELL)
472     scheduleSendPendingMessages();
473     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
474         continue;
475
476 #if defined(SPARKS)
477     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
478 #endif
479
480     /* If we still have no work we need to send a FISH to get a spark
481        from another PE */
482     if (emptyRunQueue(cap)) {
483         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
484         ASSERT(rtsFalse); // should not happen at the moment
485     }
486     // from here: non-empty run queue.
487     //  TODO: merge above case with this, only one call processMessages() !
488     if (PacketsWaiting()) {  /* process incoming messages, if
489                                 any pending...  only in else
490                                 because getRemoteWork waits for
491                                 messages as well */
492         receivedFinish = processMessages();
493     }
494 #endif
495
496 #if defined(GRAN)
497     scheduleProcessEvent(event);
498 #endif
499
500     // 
501     // Get a thread to run
502     //
503     t = popRunQueue(cap);
504
505 #if defined(GRAN) || defined(PAR)
506     scheduleGranParReport(); // some kind of debuging output
507 #else
508     // Sanity check the thread we're about to run.  This can be
509     // expensive if there is lots of thread switching going on...
510     IF_DEBUG(sanity,checkTSO(t));
511 #endif
512
513 #if defined(THREADED_RTS)
514     // Check whether we can run this thread in the current task.
515     // If not, we have to pass our capability to the right task.
516     {
517         Task *bound = t->bound;
518       
519         if (bound) {
520             if (bound == task) {
521                 IF_DEBUG(scheduler,
522                          sched_belch("### Running thread %d in bound thread",
523                                      t->id));
524                 // yes, the Haskell thread is bound to the current native thread
525             } else {
526                 IF_DEBUG(scheduler,
527                          sched_belch("### thread %d bound to another OS thread",
528                                      t->id));
529                 // no, bound to a different Haskell thread: pass to that thread
530                 pushOnRunQueue(cap,t);
531                 continue;
532             }
533         } else {
534             // The thread we want to run is unbound.
535             if (task->tso) { 
536                 IF_DEBUG(scheduler,
537                          sched_belch("### this OS thread cannot run thread %d", t->id));
538                 // no, the current native thread is bound to a different
539                 // Haskell thread, so pass it to any worker thread
540                 pushOnRunQueue(cap,t);
541                 continue; 
542             }
543         }
544     }
545 #endif
546
547     cap->r.rCurrentTSO = t;
548     
549     /* context switches are initiated by the timer signal, unless
550      * the user specified "context switch as often as possible", with
551      * +RTS -C0
552      */
553     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
554         && !emptyThreadQueues(cap)) {
555         context_switch = 1;
556     }
557          
558 run_thread:
559
560     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
561                               (long)t->id, whatNext_strs[t->what_next]));
562
563 #if defined(PROFILING)
564     startHeapProfTimer();
565 #endif
566
567     // ----------------------------------------------------------------------
568     // Run the current thread 
569
570     prev_what_next = t->what_next;
571
572     errno = t->saved_errno;
573     cap->in_haskell = rtsTrue;
574
575     recent_activity = ACTIVITY_YES;
576
577     switch (prev_what_next) {
578         
579     case ThreadKilled:
580     case ThreadComplete:
581         /* Thread already finished, return to scheduler. */
582         ret = ThreadFinished;
583         break;
584         
585     case ThreadRunGHC:
586     {
587         StgRegTable *r;
588         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
589         cap = regTableToCapability(r);
590         ret = r->rRet;
591         break;
592     }
593     
594     case ThreadInterpret:
595         cap = interpretBCO(cap);
596         ret = cap->r.rRet;
597         break;
598         
599     default:
600         barf("schedule: invalid what_next field");
601     }
602
603     cap->in_haskell = rtsFalse;
604
605 #ifdef SMP
606     // If ret is ThreadBlocked, and this Task is bound to the TSO that
607     // blocked, we are in limbo - the TSO is now owned by whatever it
608     // is blocked on, and may in fact already have been woken up,
609     // perhaps even on a different Capability.  It may be the case
610     // that task->cap != cap.  We better yield this Capability
611     // immediately and return to normaility.
612     if (ret == ThreadBlocked) continue;
613 #endif
614
615     ASSERT(cap->running_task == task);
616     ASSERT(task->cap == cap);
617     ASSERT(myTask() == task);
618
619     // The TSO might have moved, eg. if it re-entered the RTS and a GC
620     // happened.  So find the new location:
621     t = cap->r.rCurrentTSO;
622
623     // And save the current errno in this thread.
624     t->saved_errno = errno;
625
626     // ----------------------------------------------------------------------
627     
628     // Costs for the scheduler are assigned to CCS_SYSTEM
629 #if defined(PROFILING)
630     stopHeapProfTimer();
631     CCCS = CCS_SYSTEM;
632 #endif
633     
634     // We have run some Haskell code: there might be blackhole-blocked
635     // threads to wake up now.
636     // Lock-free test here should be ok, we're just setting a flag.
637     if ( blackhole_queue != END_TSO_QUEUE ) {
638         blackholes_need_checking = rtsTrue;
639     }
640     
641 #if defined(THREADED_RTS)
642     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
643 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
644     IF_DEBUG(scheduler,debugBelch("sched: "););
645 #endif
646     
647     schedulePostRunThread();
648
649     ready_to_gc = rtsFalse;
650
651     switch (ret) {
652     case HeapOverflow:
653         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
654         break;
655
656     case StackOverflow:
657         scheduleHandleStackOverflow(cap,task,t);
658         break;
659
660     case ThreadYielding:
661         if (scheduleHandleYield(cap, t, prev_what_next)) {
662             // shortcut for switching between compiler/interpreter:
663             goto run_thread; 
664         }
665         break;
666
667     case ThreadBlocked:
668         scheduleHandleThreadBlocked(t);
669         break;
670
671     case ThreadFinished:
672         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
673         break;
674
675     default:
676       barf("schedule: invalid thread return code %d", (int)ret);
677     }
678
679     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
680     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
681   } /* end of while() */
682
683   IF_PAR_DEBUG(verbose,
684                debugBelch("== Leaving schedule() after having received Finish\n"));
685 }
686
687 /* ----------------------------------------------------------------------------
688  * Setting up the scheduler loop
689  * ------------------------------------------------------------------------- */
690
691 static void
692 schedulePreLoop(void)
693 {
694 #if defined(GRAN) 
695     /* set up first event to get things going */
696     /* ToDo: assign costs for system setup and init MainTSO ! */
697     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
698               ContinueThread, 
699               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
700     
701     IF_DEBUG(gran,
702              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
703                         CurrentTSO);
704              G_TSO(CurrentTSO, 5));
705     
706     if (RtsFlags.GranFlags.Light) {
707         /* Save current time; GranSim Light only */
708         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
709     }      
710 #endif
711 }
712
713 /* ----------------------------------------------------------------------------
714  * Start any pending signal handlers
715  * ------------------------------------------------------------------------- */
716
717 static void
718 scheduleStartSignalHandlers(void)
719 {
720 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
721     if (signals_pending()) { // safe outside the lock
722         startSignalHandlers();
723     }
724 #endif
725 }
726
727 /* ----------------------------------------------------------------------------
728  * Check for blocked threads that can be woken up.
729  * ------------------------------------------------------------------------- */
730
731 static void
732 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
733 {
734 #if !defined(THREADED_RTS)
735     //
736     // Check whether any waiting threads need to be woken up.  If the
737     // run queue is empty, and there are no other tasks running, we
738     // can wait indefinitely for something to happen.
739     //
740     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
741     {
742         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
743     }
744 #endif
745 }
746
747
748 /* ----------------------------------------------------------------------------
749  * Check for threads blocked on BLACKHOLEs that can be woken up
750  * ------------------------------------------------------------------------- */
751 static void
752 scheduleCheckBlackHoles (Capability *cap)
753 {
754     if ( blackholes_need_checking ) // check without the lock first
755     {
756         ACQUIRE_LOCK(&sched_mutex);
757         if ( blackholes_need_checking ) {
758             checkBlackHoles(cap);
759             blackholes_need_checking = rtsFalse;
760         }
761         RELEASE_LOCK(&sched_mutex);
762     }
763 }
764
765 /* ----------------------------------------------------------------------------
766  * Detect deadlock conditions and attempt to resolve them.
767  * ------------------------------------------------------------------------- */
768
769 static void
770 scheduleDetectDeadlock (Capability *cap, Task *task)
771 {
772
773 #if defined(PARALLEL_HASKELL)
774     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
775     return;
776 #endif
777
778     /* 
779      * Detect deadlock: when we have no threads to run, there are no
780      * threads blocked, waiting for I/O, or sleeping, and all the
781      * other tasks are waiting for work, we must have a deadlock of
782      * some description.
783      */
784     if ( emptyThreadQueues(cap) )
785     {
786 #if defined(THREADED_RTS)
787         /* 
788          * In the threaded RTS, we only check for deadlock if there
789          * has been no activity in a complete timeslice.  This means
790          * we won't eagerly start a full GC just because we don't have
791          * any threads to run currently.
792          */
793         if (recent_activity != ACTIVITY_INACTIVE) return;
794 #endif
795
796         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
797
798         // Garbage collection can release some new threads due to
799         // either (a) finalizers or (b) threads resurrected because
800         // they are unreachable and will therefore be sent an
801         // exception.  Any threads thus released will be immediately
802         // runnable.
803         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
804         recent_activity = ACTIVITY_DONE_GC;
805         
806         if ( !emptyRunQueue(cap) ) return;
807
808 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
809         /* If we have user-installed signal handlers, then wait
810          * for signals to arrive rather then bombing out with a
811          * deadlock.
812          */
813         if ( anyUserHandlers() ) {
814             IF_DEBUG(scheduler, 
815                      sched_belch("still deadlocked, waiting for signals..."));
816
817             awaitUserSignals();
818
819             if (signals_pending()) {
820                 startSignalHandlers();
821             }
822
823             // either we have threads to run, or we were interrupted:
824             ASSERT(!emptyRunQueue(cap) || interrupted);
825         }
826 #endif
827
828 #if !defined(THREADED_RTS)
829         /* Probably a real deadlock.  Send the current main thread the
830          * Deadlock exception.
831          */
832         if (task->tso) {
833             switch (task->tso->why_blocked) {
834             case BlockedOnSTM:
835             case BlockedOnBlackHole:
836             case BlockedOnException:
837             case BlockedOnMVar:
838                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
839                 return;
840             default:
841                 barf("deadlock: main thread blocked in a strange way");
842             }
843         }
844         return;
845 #endif
846     }
847 }
848
849 /* ----------------------------------------------------------------------------
850  * Process an event (GRAN only)
851  * ------------------------------------------------------------------------- */
852
853 #if defined(GRAN)
854 static StgTSO *
855 scheduleProcessEvent(rtsEvent *event)
856 {
857     StgTSO *t;
858
859     if (RtsFlags.GranFlags.Light)
860       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
861
862     /* adjust time based on time-stamp */
863     if (event->time > CurrentTime[CurrentProc] &&
864         event->evttype != ContinueThread)
865       CurrentTime[CurrentProc] = event->time;
866     
867     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
868     if (!RtsFlags.GranFlags.Light)
869       handleIdlePEs();
870
871     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
872
873     /* main event dispatcher in GranSim */
874     switch (event->evttype) {
875       /* Should just be continuing execution */
876     case ContinueThread:
877       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
878       /* ToDo: check assertion
879       ASSERT(run_queue_hd != (StgTSO*)NULL &&
880              run_queue_hd != END_TSO_QUEUE);
881       */
882       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
883       if (!RtsFlags.GranFlags.DoAsyncFetch &&
884           procStatus[CurrentProc]==Fetching) {
885         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
886               CurrentTSO->id, CurrentTSO, CurrentProc);
887         goto next_thread;
888       } 
889       /* Ignore ContinueThreads for completed threads */
890       if (CurrentTSO->what_next == ThreadComplete) {
891         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
892               CurrentTSO->id, CurrentTSO, CurrentProc);
893         goto next_thread;
894       } 
895       /* Ignore ContinueThreads for threads that are being migrated */
896       if (PROCS(CurrentTSO)==Nowhere) { 
897         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
898               CurrentTSO->id, CurrentTSO, CurrentProc);
899         goto next_thread;
900       }
901       /* The thread should be at the beginning of the run queue */
902       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
903         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
904               CurrentTSO->id, CurrentTSO, CurrentProc);
905         break; // run the thread anyway
906       }
907       /*
908       new_event(proc, proc, CurrentTime[proc],
909                 FindWork,
910                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
911       goto next_thread; 
912       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
913       break; // now actually run the thread; DaH Qu'vam yImuHbej 
914
915     case FetchNode:
916       do_the_fetchnode(event);
917       goto next_thread;             /* handle next event in event queue  */
918       
919     case GlobalBlock:
920       do_the_globalblock(event);
921       goto next_thread;             /* handle next event in event queue  */
922       
923     case FetchReply:
924       do_the_fetchreply(event);
925       goto next_thread;             /* handle next event in event queue  */
926       
927     case UnblockThread:   /* Move from the blocked queue to the tail of */
928       do_the_unblock(event);
929       goto next_thread;             /* handle next event in event queue  */
930       
931     case ResumeThread:  /* Move from the blocked queue to the tail of */
932       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
933       event->tso->gran.blocktime += 
934         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
935       do_the_startthread(event);
936       goto next_thread;             /* handle next event in event queue  */
937       
938     case StartThread:
939       do_the_startthread(event);
940       goto next_thread;             /* handle next event in event queue  */
941       
942     case MoveThread:
943       do_the_movethread(event);
944       goto next_thread;             /* handle next event in event queue  */
945       
946     case MoveSpark:
947       do_the_movespark(event);
948       goto next_thread;             /* handle next event in event queue  */
949       
950     case FindWork:
951       do_the_findwork(event);
952       goto next_thread;             /* handle next event in event queue  */
953       
954     default:
955       barf("Illegal event type %u\n", event->evttype);
956     }  /* switch */
957     
958     /* This point was scheduler_loop in the old RTS */
959
960     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
961
962     TimeOfLastEvent = CurrentTime[CurrentProc];
963     TimeOfNextEvent = get_time_of_next_event();
964     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
965     // CurrentTSO = ThreadQueueHd;
966
967     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
968                          TimeOfNextEvent));
969
970     if (RtsFlags.GranFlags.Light) 
971       GranSimLight_leave_system(event, &ActiveTSO); 
972
973     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
974
975     IF_DEBUG(gran, 
976              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
977
978     /* in a GranSim setup the TSO stays on the run queue */
979     t = CurrentTSO;
980     /* Take a thread from the run queue. */
981     POP_RUN_QUEUE(t); // take_off_run_queue(t);
982
983     IF_DEBUG(gran, 
984              debugBelch("GRAN: About to run current thread, which is\n");
985              G_TSO(t,5));
986
987     context_switch = 0; // turned on via GranYield, checking events and time slice
988
989     IF_DEBUG(gran, 
990              DumpGranEvent(GR_SCHEDULE, t));
991
992     procStatus[CurrentProc] = Busy;
993 }
994 #endif // GRAN
995
996 /* ----------------------------------------------------------------------------
997  * Send pending messages (PARALLEL_HASKELL only)
998  * ------------------------------------------------------------------------- */
999
1000 #if defined(PARALLEL_HASKELL)
1001 static StgTSO *
1002 scheduleSendPendingMessages(void)
1003 {
1004     StgSparkPool *pool;
1005     rtsSpark spark;
1006     StgTSO *t;
1007
1008 # if defined(PAR) // global Mem.Mgmt., omit for now
1009     if (PendingFetches != END_BF_QUEUE) {
1010         processFetches();
1011     }
1012 # endif
1013     
1014     if (RtsFlags.ParFlags.BufferTime) {
1015         // if we use message buffering, we must send away all message
1016         // packets which have become too old...
1017         sendOldBuffers(); 
1018     }
1019 }
1020 #endif
1021
1022 /* ----------------------------------------------------------------------------
1023  * Activate spark threads (PARALLEL_HASKELL only)
1024  * ------------------------------------------------------------------------- */
1025
1026 #if defined(PARALLEL_HASKELL)
1027 static void
1028 scheduleActivateSpark(void)
1029 {
1030 #if defined(SPARKS)
1031   ASSERT(emptyRunQueue());
1032 /* We get here if the run queue is empty and want some work.
1033    We try to turn a spark into a thread, and add it to the run queue,
1034    from where it will be picked up in the next iteration of the scheduler
1035    loop.
1036 */
1037
1038       /* :-[  no local threads => look out for local sparks */
1039       /* the spark pool for the current PE */
1040       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1041       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1042           pool->hd < pool->tl) {
1043         /* 
1044          * ToDo: add GC code check that we really have enough heap afterwards!!
1045          * Old comment:
1046          * If we're here (no runnable threads) and we have pending
1047          * sparks, we must have a space problem.  Get enough space
1048          * to turn one of those pending sparks into a
1049          * thread... 
1050          */
1051
1052         spark = findSpark(rtsFalse);            /* get a spark */
1053         if (spark != (rtsSpark) NULL) {
1054           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1055           IF_PAR_DEBUG(fish, // schedule,
1056                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1057                              tso->id, tso, advisory_thread_count));
1058
1059           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1060             IF_PAR_DEBUG(fish, // schedule,
1061                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1062                             spark));
1063             return rtsFalse; /* failed to generate a thread */
1064           }                  /* otherwise fall through & pick-up new tso */
1065         } else {
1066           IF_PAR_DEBUG(fish, // schedule,
1067                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1068                              spark_queue_len(pool)));
1069           return rtsFalse;  /* failed to generate a thread */
1070         }
1071         return rtsTrue;  /* success in generating a thread */
1072   } else { /* no more threads permitted or pool empty */
1073     return rtsFalse;  /* failed to generateThread */
1074   }
1075 #else
1076   tso = NULL; // avoid compiler warning only
1077   return rtsFalse;  /* dummy in non-PAR setup */
1078 #endif // SPARKS
1079 }
1080 #endif // PARALLEL_HASKELL
1081
1082 /* ----------------------------------------------------------------------------
1083  * Get work from a remote node (PARALLEL_HASKELL only)
1084  * ------------------------------------------------------------------------- */
1085     
1086 #if defined(PARALLEL_HASKELL)
1087 static rtsBool
1088 scheduleGetRemoteWork(rtsBool *receivedFinish)
1089 {
1090   ASSERT(emptyRunQueue());
1091
1092   if (RtsFlags.ParFlags.BufferTime) {
1093         IF_PAR_DEBUG(verbose, 
1094                 debugBelch("...send all pending data,"));
1095         {
1096           nat i;
1097           for (i=1; i<=nPEs; i++)
1098             sendImmediately(i); // send all messages away immediately
1099         }
1100   }
1101 # ifndef SPARKS
1102         //++EDEN++ idle() , i.e. send all buffers, wait for work
1103         // suppress fishing in EDEN... just look for incoming messages
1104         // (blocking receive)
1105   IF_PAR_DEBUG(verbose, 
1106                debugBelch("...wait for incoming messages...\n"));
1107   *receivedFinish = processMessages(); // blocking receive...
1108
1109         // and reenter scheduling loop after having received something
1110         // (return rtsFalse below)
1111
1112 # else /* activate SPARKS machinery */
1113 /* We get here, if we have no work, tried to activate a local spark, but still
1114    have no work. We try to get a remote spark, by sending a FISH message.
1115    Thread migration should be added here, and triggered when a sequence of 
1116    fishes returns without work. */
1117         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1118
1119       /* =8-[  no local sparks => look for work on other PEs */
1120         /*
1121          * We really have absolutely no work.  Send out a fish
1122          * (there may be some out there already), and wait for
1123          * something to arrive.  We clearly can't run any threads
1124          * until a SCHEDULE or RESUME arrives, and so that's what
1125          * we're hoping to see.  (Of course, we still have to
1126          * respond to other types of messages.)
1127          */
1128         rtsTime now = msTime() /*CURRENT_TIME*/;
1129         IF_PAR_DEBUG(verbose, 
1130                      debugBelch("--  now=%ld\n", now));
1131         IF_PAR_DEBUG(fish, // verbose,
1132              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1133                  (last_fish_arrived_at!=0 &&
1134                   last_fish_arrived_at+delay > now)) {
1135                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1136                      now, last_fish_arrived_at+delay, 
1137                      last_fish_arrived_at,
1138                      delay);
1139              });
1140   
1141         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1142             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1143           if (last_fish_arrived_at==0 ||
1144               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1145             /* outstandingFishes is set in sendFish, processFish;
1146                avoid flooding system with fishes via delay */
1147     next_fish_to_send_at = 0;  
1148   } else {
1149     /* ToDo: this should be done in the main scheduling loop to avoid the
1150              busy wait here; not so bad if fish delay is very small  */
1151     int iq = 0; // DEBUGGING -- HWL
1152     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1153     /* send a fish when ready, but process messages that arrive in the meantime */
1154     do {
1155       if (PacketsWaiting()) {
1156         iq++; // DEBUGGING
1157         *receivedFinish = processMessages();
1158       }
1159       now = msTime();
1160     } while (!*receivedFinish || now<next_fish_to_send_at);
1161     // JB: This means the fish could become obsolete, if we receive
1162     // work. Better check for work again? 
1163     // last line: while (!receivedFinish || !haveWork || now<...)
1164     // next line: if (receivedFinish || haveWork )
1165
1166     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1167       return rtsFalse;  // NB: this will leave scheduler loop
1168                         // immediately after return!
1169                           
1170     IF_PAR_DEBUG(fish, // verbose,
1171                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1172
1173   }
1174
1175     // JB: IMHO, this should all be hidden inside sendFish(...)
1176     /* pe = choosePE(); 
1177        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1178                 NEW_FISH_HUNGER);
1179
1180     // Global statistics: count no. of fishes
1181     if (RtsFlags.ParFlags.ParStats.Global &&
1182          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1183            globalParStats.tot_fish_mess++;
1184            }
1185     */ 
1186
1187   /* delayed fishes must have been sent by now! */
1188   next_fish_to_send_at = 0;  
1189   }
1190       
1191   *receivedFinish = processMessages();
1192 # endif /* SPARKS */
1193
1194  return rtsFalse;
1195  /* NB: this function always returns rtsFalse, meaning the scheduler
1196     loop continues with the next iteration; 
1197     rationale: 
1198       return code means success in finding work; we enter this function
1199       if there is no local work, thus have to send a fish which takes
1200       time until it arrives with work; in the meantime we should process
1201       messages in the main loop;
1202  */
1203 }
1204 #endif // PARALLEL_HASKELL
1205
1206 /* ----------------------------------------------------------------------------
1207  * PAR/GRAN: Report stats & debugging info(?)
1208  * ------------------------------------------------------------------------- */
1209
1210 #if defined(PAR) || defined(GRAN)
1211 static void
1212 scheduleGranParReport(void)
1213 {
1214   ASSERT(run_queue_hd != END_TSO_QUEUE);
1215
1216   /* Take a thread from the run queue, if we have work */
1217   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1218
1219     /* If this TSO has got its outport closed in the meantime, 
1220      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1221      * It has to be marked as TH_DEAD for this purpose.
1222      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1223
1224 JB: TODO: investigate wether state change field could be nuked
1225      entirely and replaced by the normal tso state (whatnext
1226      field). All we want to do is to kill tsos from outside.
1227      */
1228
1229     /* ToDo: write something to the log-file
1230     if (RTSflags.ParFlags.granSimStats && !sameThread)
1231         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1232
1233     CurrentTSO = t;
1234     */
1235     /* the spark pool for the current PE */
1236     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1237
1238     IF_DEBUG(scheduler, 
1239              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1240                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1241
1242     IF_PAR_DEBUG(fish,
1243              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1244                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1245
1246     if (RtsFlags.ParFlags.ParStats.Full && 
1247         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1248         (emitSchedule || // forced emit
1249          (t && LastTSO && t->id != LastTSO->id))) {
1250       /* 
1251          we are running a different TSO, so write a schedule event to log file
1252          NB: If we use fair scheduling we also have to write  a deschedule 
1253              event for LastTSO; with unfair scheduling we know that the
1254              previous tso has blocked whenever we switch to another tso, so
1255              we don't need it in GUM for now
1256       */
1257       IF_PAR_DEBUG(fish, // schedule,
1258                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1259
1260       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1261                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1262       emitSchedule = rtsFalse;
1263     }
1264 }     
1265 #endif
1266
1267 /* ----------------------------------------------------------------------------
1268  * After running a thread...
1269  * ------------------------------------------------------------------------- */
1270
1271 static void
1272 schedulePostRunThread(void)
1273 {
1274 #if defined(PAR)
1275     /* HACK 675: if the last thread didn't yield, make sure to print a 
1276        SCHEDULE event to the log file when StgRunning the next thread, even
1277        if it is the same one as before */
1278     LastTSO = t; 
1279     TimeOfLastYield = CURRENT_TIME;
1280 #endif
1281
1282   /* some statistics gathering in the parallel case */
1283
1284 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1285   switch (ret) {
1286     case HeapOverflow:
1287 # if defined(GRAN)
1288       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1289       globalGranStats.tot_heapover++;
1290 # elif defined(PAR)
1291       globalParStats.tot_heapover++;
1292 # endif
1293       break;
1294
1295      case StackOverflow:
1296 # if defined(GRAN)
1297       IF_DEBUG(gran, 
1298                DumpGranEvent(GR_DESCHEDULE, t));
1299       globalGranStats.tot_stackover++;
1300 # elif defined(PAR)
1301       // IF_DEBUG(par, 
1302       // DumpGranEvent(GR_DESCHEDULE, t);
1303       globalParStats.tot_stackover++;
1304 # endif
1305       break;
1306
1307     case ThreadYielding:
1308 # if defined(GRAN)
1309       IF_DEBUG(gran, 
1310                DumpGranEvent(GR_DESCHEDULE, t));
1311       globalGranStats.tot_yields++;
1312 # elif defined(PAR)
1313       // IF_DEBUG(par, 
1314       // DumpGranEvent(GR_DESCHEDULE, t);
1315       globalParStats.tot_yields++;
1316 # endif
1317       break; 
1318
1319     case ThreadBlocked:
1320 # if defined(GRAN)
1321       IF_DEBUG(scheduler,
1322                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1323                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1324                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1325                if (t->block_info.closure!=(StgClosure*)NULL)
1326                  print_bq(t->block_info.closure);
1327                debugBelch("\n"));
1328
1329       // ??? needed; should emit block before
1330       IF_DEBUG(gran, 
1331                DumpGranEvent(GR_DESCHEDULE, t)); 
1332       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1333       /*
1334         ngoq Dogh!
1335       ASSERT(procStatus[CurrentProc]==Busy || 
1336               ((procStatus[CurrentProc]==Fetching) && 
1337               (t->block_info.closure!=(StgClosure*)NULL)));
1338       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1339           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1340             procStatus[CurrentProc]==Fetching)) 
1341         procStatus[CurrentProc] = Idle;
1342       */
1343 # elif defined(PAR)
1344 //++PAR++  blockThread() writes the event (change?)
1345 # endif
1346     break;
1347
1348   case ThreadFinished:
1349     break;
1350
1351   default:
1352     barf("parGlobalStats: unknown return code");
1353     break;
1354     }
1355 #endif
1356 }
1357
1358 /* -----------------------------------------------------------------------------
1359  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1360  * -------------------------------------------------------------------------- */
1361
1362 static rtsBool
1363 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1364 {
1365     // did the task ask for a large block?
1366     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1367         // if so, get one and push it on the front of the nursery.
1368         bdescr *bd;
1369         lnat blocks;
1370         
1371         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1372         
1373         IF_DEBUG(scheduler,
1374                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1375                             (long)t->id, whatNext_strs[t->what_next], blocks));
1376         
1377         // don't do this if the nursery is (nearly) full, we'll GC first.
1378         if (cap->r.rCurrentNursery->link != NULL ||
1379             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1380                                                // if the nursery has only one block.
1381             
1382             ACQUIRE_SM_LOCK
1383             bd = allocGroup( blocks );
1384             RELEASE_SM_LOCK
1385             cap->r.rNursery->n_blocks += blocks;
1386             
1387             // link the new group into the list
1388             bd->link = cap->r.rCurrentNursery;
1389             bd->u.back = cap->r.rCurrentNursery->u.back;
1390             if (cap->r.rCurrentNursery->u.back != NULL) {
1391                 cap->r.rCurrentNursery->u.back->link = bd;
1392             } else {
1393 #if !defined(SMP)
1394                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1395                        g0s0 == cap->r.rNursery);
1396 #endif
1397                 cap->r.rNursery->blocks = bd;
1398             }             
1399             cap->r.rCurrentNursery->u.back = bd;
1400             
1401             // initialise it as a nursery block.  We initialise the
1402             // step, gen_no, and flags field of *every* sub-block in
1403             // this large block, because this is easier than making
1404             // sure that we always find the block head of a large
1405             // block whenever we call Bdescr() (eg. evacuate() and
1406             // isAlive() in the GC would both have to do this, at
1407             // least).
1408             { 
1409                 bdescr *x;
1410                 for (x = bd; x < bd + blocks; x++) {
1411                     x->step = cap->r.rNursery;
1412                     x->gen_no = 0;
1413                     x->flags = 0;
1414                 }
1415             }
1416             
1417             // This assert can be a killer if the app is doing lots
1418             // of large block allocations.
1419             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1420             
1421             // now update the nursery to point to the new block
1422             cap->r.rCurrentNursery = bd;
1423             
1424             // we might be unlucky and have another thread get on the
1425             // run queue before us and steal the large block, but in that
1426             // case the thread will just end up requesting another large
1427             // block.
1428             pushOnRunQueue(cap,t);
1429             return rtsFalse;  /* not actually GC'ing */
1430         }
1431     }
1432     
1433     IF_DEBUG(scheduler,
1434              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1435                         (long)t->id, whatNext_strs[t->what_next]));
1436 #if defined(GRAN)
1437     ASSERT(!is_on_queue(t,CurrentProc));
1438 #elif defined(PARALLEL_HASKELL)
1439     /* Currently we emit a DESCHEDULE event before GC in GUM.
1440        ToDo: either add separate event to distinguish SYSTEM time from rest
1441        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1442     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1443         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1444                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1445         emitSchedule = rtsTrue;
1446     }
1447 #endif
1448       
1449     pushOnRunQueue(cap,t);
1450     return rtsTrue;
1451     /* actual GC is done at the end of the while loop in schedule() */
1452 }
1453
1454 /* -----------------------------------------------------------------------------
1455  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1456  * -------------------------------------------------------------------------- */
1457
1458 static void
1459 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1460 {
1461     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1462                                   (long)t->id, whatNext_strs[t->what_next]));
1463     /* just adjust the stack for this thread, then pop it back
1464      * on the run queue.
1465      */
1466     { 
1467         /* enlarge the stack */
1468         StgTSO *new_t = threadStackOverflow(cap, t);
1469         
1470         /* This TSO has moved, so update any pointers to it from the
1471          * main thread stack.  It better not be on any other queues...
1472          * (it shouldn't be).
1473          */
1474         if (task->tso != NULL) {
1475             task->tso = new_t;
1476         }
1477         pushOnRunQueue(cap,new_t);
1478     }
1479 }
1480
1481 /* -----------------------------------------------------------------------------
1482  * Handle a thread that returned to the scheduler with ThreadYielding
1483  * -------------------------------------------------------------------------- */
1484
1485 static rtsBool
1486 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1487 {
1488     // Reset the context switch flag.  We don't do this just before
1489     // running the thread, because that would mean we would lose ticks
1490     // during GC, which can lead to unfair scheduling (a thread hogs
1491     // the CPU because the tick always arrives during GC).  This way
1492     // penalises threads that do a lot of allocation, but that seems
1493     // better than the alternative.
1494     context_switch = 0;
1495     
1496     /* put the thread back on the run queue.  Then, if we're ready to
1497      * GC, check whether this is the last task to stop.  If so, wake
1498      * up the GC thread.  getThread will block during a GC until the
1499      * GC is finished.
1500      */
1501     IF_DEBUG(scheduler,
1502              if (t->what_next != prev_what_next) {
1503                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1504                             (long)t->id, whatNext_strs[t->what_next]);
1505              } else {
1506                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1507                             (long)t->id, whatNext_strs[t->what_next]);
1508              }
1509         );
1510     
1511     IF_DEBUG(sanity,
1512              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1513              checkTSO(t));
1514     ASSERT(t->link == END_TSO_QUEUE);
1515     
1516     // Shortcut if we're just switching evaluators: don't bother
1517     // doing stack squeezing (which can be expensive), just run the
1518     // thread.
1519     if (t->what_next != prev_what_next) {
1520         return rtsTrue;
1521     }
1522     
1523 #if defined(GRAN)
1524     ASSERT(!is_on_queue(t,CurrentProc));
1525       
1526     IF_DEBUG(sanity,
1527              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1528              checkThreadQsSanity(rtsTrue));
1529
1530 #endif
1531
1532     addToRunQueue(cap,t);
1533
1534 #if defined(GRAN)
1535     /* add a ContinueThread event to actually process the thread */
1536     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1537               ContinueThread,
1538               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1539     IF_GRAN_DEBUG(bq, 
1540                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1541                   G_EVENTQ(0);
1542                   G_CURR_THREADQ(0));
1543 #endif
1544     return rtsFalse;
1545 }
1546
1547 /* -----------------------------------------------------------------------------
1548  * Handle a thread that returned to the scheduler with ThreadBlocked
1549  * -------------------------------------------------------------------------- */
1550
1551 static void
1552 scheduleHandleThreadBlocked( StgTSO *t
1553 #if !defined(GRAN) && !defined(DEBUG)
1554     STG_UNUSED
1555 #endif
1556     )
1557 {
1558 #if defined(GRAN)
1559     IF_DEBUG(scheduler,
1560              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1561                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1562              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1563     
1564     // ??? needed; should emit block before
1565     IF_DEBUG(gran, 
1566              DumpGranEvent(GR_DESCHEDULE, t)); 
1567     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1568     /*
1569       ngoq Dogh!
1570       ASSERT(procStatus[CurrentProc]==Busy || 
1571       ((procStatus[CurrentProc]==Fetching) && 
1572       (t->block_info.closure!=(StgClosure*)NULL)));
1573       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1574       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1575       procStatus[CurrentProc]==Fetching)) 
1576       procStatus[CurrentProc] = Idle;
1577     */
1578 #elif defined(PAR)
1579     IF_DEBUG(scheduler,
1580              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1581                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1582     IF_PAR_DEBUG(bq,
1583                  
1584                  if (t->block_info.closure!=(StgClosure*)NULL) 
1585                  print_bq(t->block_info.closure));
1586     
1587     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1588     blockThread(t);
1589     
1590     /* whatever we schedule next, we must log that schedule */
1591     emitSchedule = rtsTrue;
1592     
1593 #else /* !GRAN */
1594
1595       // We don't need to do anything.  The thread is blocked, and it
1596       // has tidied up its stack and placed itself on whatever queue
1597       // it needs to be on.
1598
1599 #if !defined(SMP)
1600     ASSERT(t->why_blocked != NotBlocked);
1601              // This might not be true under SMP: we don't have
1602              // exclusive access to this TSO, so someone might have
1603              // woken it up by now.  This actually happens: try
1604              // conc023 +RTS -N2.
1605 #endif
1606
1607     IF_DEBUG(scheduler,
1608              debugBelch("--<< thread %d (%s) stopped: ", 
1609                         t->id, whatNext_strs[t->what_next]);
1610              printThreadBlockage(t);
1611              debugBelch("\n"));
1612     
1613     /* Only for dumping event to log file 
1614        ToDo: do I need this in GranSim, too?
1615        blockThread(t);
1616     */
1617 #endif
1618 }
1619
1620 /* -----------------------------------------------------------------------------
1621  * Handle a thread that returned to the scheduler with ThreadFinished
1622  * -------------------------------------------------------------------------- */
1623
1624 static rtsBool
1625 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1626 {
1627     /* Need to check whether this was a main thread, and if so,
1628      * return with the return value.
1629      *
1630      * We also end up here if the thread kills itself with an
1631      * uncaught exception, see Exception.cmm.
1632      */
1633     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1634                                   t->id, whatNext_strs[t->what_next]));
1635
1636 #if defined(GRAN)
1637       endThread(t, CurrentProc); // clean-up the thread
1638 #elif defined(PARALLEL_HASKELL)
1639       /* For now all are advisory -- HWL */
1640       //if(t->priority==AdvisoryPriority) ??
1641       advisory_thread_count--; // JB: Caution with this counter, buggy!
1642       
1643 # if defined(DIST)
1644       if(t->dist.priority==RevalPriority)
1645         FinishReval(t);
1646 # endif
1647     
1648 # if defined(EDENOLD)
1649       // the thread could still have an outport... (BUG)
1650       if (t->eden.outport != -1) {
1651       // delete the outport for the tso which has finished...
1652         IF_PAR_DEBUG(eden_ports,
1653                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1654                               t->eden.outport, t->id));
1655         deleteOPT(t);
1656       }
1657       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1658       if (t->eden.epid != -1) {
1659         IF_PAR_DEBUG(eden_ports,
1660                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1661                            t->id, t->eden.epid));
1662         removeTSOfromProcess(t);
1663       }
1664 # endif 
1665
1666 # if defined(PAR)
1667       if (RtsFlags.ParFlags.ParStats.Full &&
1668           !RtsFlags.ParFlags.ParStats.Suppressed) 
1669         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1670
1671       //  t->par only contains statistics: left out for now...
1672       IF_PAR_DEBUG(fish,
1673                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1674                               t->id,t,t->par.sparkname));
1675 # endif
1676 #endif // PARALLEL_HASKELL
1677
1678       //
1679       // Check whether the thread that just completed was a bound
1680       // thread, and if so return with the result.  
1681       //
1682       // There is an assumption here that all thread completion goes
1683       // through this point; we need to make sure that if a thread
1684       // ends up in the ThreadKilled state, that it stays on the run
1685       // queue so it can be dealt with here.
1686       //
1687
1688       if (t->bound) {
1689
1690           if (t->bound != task) {
1691 #if !defined(THREADED_RTS)
1692               // Must be a bound thread that is not the topmost one.  Leave
1693               // it on the run queue until the stack has unwound to the
1694               // point where we can deal with this.  Leaving it on the run
1695               // queue also ensures that the garbage collector knows about
1696               // this thread and its return value (it gets dropped from the
1697               // all_threads list so there's no other way to find it).
1698               appendToRunQueue(cap,t);
1699               return rtsFalse;
1700 #else
1701               // this cannot happen in the threaded RTS, because a
1702               // bound thread can only be run by the appropriate Task.
1703               barf("finished bound thread that isn't mine");
1704 #endif
1705           }
1706
1707           ASSERT(task->tso == t);
1708
1709           if (t->what_next == ThreadComplete) {
1710               if (task->ret) {
1711                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1712                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1713               }
1714               task->stat = Success;
1715           } else {
1716               if (task->ret) {
1717                   *(task->ret) = NULL;
1718               }
1719               if (interrupted) {
1720                   task->stat = Interrupted;
1721               } else {
1722                   task->stat = Killed;
1723               }
1724           }
1725 #ifdef DEBUG
1726           removeThreadLabel((StgWord)task->tso->id);
1727 #endif
1728           return rtsTrue; // tells schedule() to return
1729       }
1730
1731       return rtsFalse;
1732 }
1733
1734 /* -----------------------------------------------------------------------------
1735  * Perform a heap census, if PROFILING
1736  * -------------------------------------------------------------------------- */
1737
1738 static rtsBool
1739 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1740 {
1741 #if defined(PROFILING)
1742     // When we have +RTS -i0 and we're heap profiling, do a census at
1743     // every GC.  This lets us get repeatable runs for debugging.
1744     if (performHeapProfile ||
1745         (RtsFlags.ProfFlags.profileInterval==0 &&
1746          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1747         GarbageCollect(GetRoots, rtsTrue);
1748         heapCensus();
1749         performHeapProfile = rtsFalse;
1750         return rtsTrue;  // true <=> we already GC'd
1751     }
1752 #endif
1753     return rtsFalse;
1754 }
1755
1756 /* -----------------------------------------------------------------------------
1757  * Perform a garbage collection if necessary
1758  * -------------------------------------------------------------------------- */
1759
1760 static void
1761 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1762 {
1763     StgTSO *t;
1764 #ifdef SMP
1765     static volatile StgWord waiting_for_gc;
1766     rtsBool was_waiting;
1767     nat i;
1768 #endif
1769
1770 #ifdef SMP
1771     // In order to GC, there must be no threads running Haskell code.
1772     // Therefore, the GC thread needs to hold *all* the capabilities,
1773     // and release them after the GC has completed.  
1774     //
1775     // This seems to be the simplest way: previous attempts involved
1776     // making all the threads with capabilities give up their
1777     // capabilities and sleep except for the *last* one, which
1778     // actually did the GC.  But it's quite hard to arrange for all
1779     // the other tasks to sleep and stay asleep.
1780     //
1781         
1782     was_waiting = cas(&waiting_for_gc, 0, 1);
1783     if (was_waiting) return;
1784
1785     for (i=0; i < n_capabilities; i++) {
1786         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1787         if (cap != &capabilities[i]) {
1788             Capability *pcap = &capabilities[i];
1789             // we better hope this task doesn't get migrated to
1790             // another Capability while we're waiting for this one.
1791             // It won't, because load balancing happens while we have
1792             // all the Capabilities, but even so it's a slightly
1793             // unsavoury invariant.
1794             task->cap = pcap;
1795             waitForReturnCapability(&pcap, task);
1796             if (pcap != &capabilities[i]) {
1797                 barf("scheduleDoGC: got the wrong capability");
1798             }
1799         }
1800     }
1801
1802     waiting_for_gc = rtsFalse;
1803 #endif
1804
1805     /* Kick any transactions which are invalid back to their
1806      * atomically frames.  When next scheduled they will try to
1807      * commit, this commit will fail and they will retry.
1808      */
1809     { 
1810         StgTSO *next;
1811
1812         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1813             if (t->what_next == ThreadRelocated) {
1814                 next = t->link;
1815             } else {
1816                 next = t->global_link;
1817                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1818                     if (!stmValidateNestOfTransactions (t -> trec)) {
1819                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1820                         
1821                         // strip the stack back to the
1822                         // ATOMICALLY_FRAME, aborting the (nested)
1823                         // transaction, and saving the stack of any
1824                         // partially-evaluated thunks on the heap.
1825                         raiseAsync_(cap, t, NULL, rtsTrue);
1826                         
1827 #ifdef REG_R1
1828                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1829 #endif
1830                     }
1831                 }
1832             }
1833         }
1834     }
1835     
1836     // so this happens periodically:
1837     scheduleCheckBlackHoles(cap);
1838     
1839     IF_DEBUG(scheduler, printAllThreads());
1840
1841     /* everybody back, start the GC.
1842      * Could do it in this thread, or signal a condition var
1843      * to do it in another thread.  Either way, we need to
1844      * broadcast on gc_pending_cond afterward.
1845      */
1846 #if defined(THREADED_RTS)
1847     IF_DEBUG(scheduler,sched_belch("doing GC"));
1848 #endif
1849     GarbageCollect(GetRoots, force_major);
1850     
1851 #if defined(SMP)
1852     // release our stash of capabilities.
1853     for (i = 0; i < n_capabilities; i++) {
1854         if (cap != &capabilities[i]) {
1855             task->cap = &capabilities[i];
1856             releaseCapability(&capabilities[i]);
1857         }
1858     }
1859     task->cap = cap;
1860 #endif
1861
1862 #if defined(GRAN)
1863     /* add a ContinueThread event to continue execution of current thread */
1864     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1865               ContinueThread,
1866               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1867     IF_GRAN_DEBUG(bq, 
1868                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1869                   G_EVENTQ(0);
1870                   G_CURR_THREADQ(0));
1871 #endif /* GRAN */
1872 }
1873
1874 /* ---------------------------------------------------------------------------
1875  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1876  * used by Control.Concurrent for error checking.
1877  * ------------------------------------------------------------------------- */
1878  
1879 StgBool
1880 rtsSupportsBoundThreads(void)
1881 {
1882 #if defined(THREADED_RTS)
1883   return rtsTrue;
1884 #else
1885   return rtsFalse;
1886 #endif
1887 }
1888
1889 /* ---------------------------------------------------------------------------
1890  * isThreadBound(tso): check whether tso is bound to an OS thread.
1891  * ------------------------------------------------------------------------- */
1892  
1893 StgBool
1894 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1895 {
1896 #if defined(THREADED_RTS)
1897   return (tso->bound != NULL);
1898 #endif
1899   return rtsFalse;
1900 }
1901
1902 /* ---------------------------------------------------------------------------
1903  * Singleton fork(). Do not copy any running threads.
1904  * ------------------------------------------------------------------------- */
1905
1906 #if !defined(mingw32_HOST_OS) && !defined(SMP)
1907 #define FORKPROCESS_PRIMOP_SUPPORTED
1908 #endif
1909
1910 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1911 static void 
1912 deleteThreadImmediately(Capability *cap, StgTSO *tso);
1913 #endif
1914 StgInt
1915 forkProcess(HsStablePtr *entry
1916 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1917             STG_UNUSED
1918 #endif
1919            )
1920 {
1921 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1922     pid_t pid;
1923     StgTSO* t,*next;
1924     Task *task;
1925     Capability *cap;
1926     
1927     IF_DEBUG(scheduler,sched_belch("forking!"));
1928     
1929     // ToDo: for SMP, we should probably acquire *all* the capabilities
1930     cap = rts_lock();
1931     
1932     pid = fork();
1933     
1934     if (pid) { // parent
1935         
1936         // just return the pid
1937         rts_unlock(cap);
1938         return pid;
1939         
1940     } else { // child
1941         
1942         // delete all threads
1943         cap->run_queue_hd = END_TSO_QUEUE;
1944         cap->run_queue_tl = END_TSO_QUEUE;
1945         
1946         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1947             next = t->link;
1948             
1949             // don't allow threads to catch the ThreadKilled exception
1950             deleteThreadImmediately(cap,t);
1951         }
1952         
1953         // wipe the main thread list
1954         while ((task = all_tasks) != NULL) {
1955             all_tasks = task->all_link;
1956             discardTask(task);
1957         }
1958         
1959         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1960         rts_checkSchedStatus("forkProcess",cap);
1961         
1962         rts_unlock(cap);
1963         hs_exit();                      // clean up and exit
1964         stg_exit(EXIT_SUCCESS);
1965     }
1966 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1967     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1968     return -1;
1969 #endif
1970 }
1971
1972 /* ---------------------------------------------------------------------------
1973  * Delete the threads on the run queue of the current capability.
1974  * ------------------------------------------------------------------------- */
1975    
1976 static void
1977 deleteRunQueue (Capability *cap)
1978 {
1979     StgTSO *t, *next;
1980     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
1981         ASSERT(t->what_next != ThreadRelocated);
1982         next = t->link;
1983         deleteThread(cap, t);
1984     }
1985 }
1986
1987 /* startThread and  insertThread are now in GranSim.c -- HWL */
1988
1989
1990 /* -----------------------------------------------------------------------------
1991    Managing the suspended_ccalling_tasks list.
1992    Locks required: sched_mutex
1993    -------------------------------------------------------------------------- */
1994
1995 STATIC_INLINE void
1996 suspendTask (Capability *cap, Task *task)
1997 {
1998     ASSERT(task->next == NULL && task->prev == NULL);
1999     task->next = cap->suspended_ccalling_tasks;
2000     task->prev = NULL;
2001     if (cap->suspended_ccalling_tasks) {
2002         cap->suspended_ccalling_tasks->prev = task;
2003     }
2004     cap->suspended_ccalling_tasks = task;
2005 }
2006
2007 STATIC_INLINE void
2008 recoverSuspendedTask (Capability *cap, Task *task)
2009 {
2010     if (task->prev) {
2011         task->prev->next = task->next;
2012     } else {
2013         ASSERT(cap->suspended_ccalling_tasks == task);
2014         cap->suspended_ccalling_tasks = task->next;
2015     }
2016     if (task->next) {
2017         task->next->prev = task->prev;
2018     }
2019     task->next = task->prev = NULL;
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * Suspending & resuming Haskell threads.
2024  * 
2025  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2026  * its capability before calling the C function.  This allows another
2027  * task to pick up the capability and carry on running Haskell
2028  * threads.  It also means that if the C call blocks, it won't lock
2029  * the whole system.
2030  *
2031  * The Haskell thread making the C call is put to sleep for the
2032  * duration of the call, on the susepended_ccalling_threads queue.  We
2033  * give out a token to the task, which it can use to resume the thread
2034  * on return from the C function.
2035  * ------------------------------------------------------------------------- */
2036    
2037 void *
2038 suspendThread (StgRegTable *reg)
2039 {
2040   Capability *cap;
2041   int saved_errno = errno;
2042   StgTSO *tso;
2043   Task *task;
2044
2045   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2046    */
2047   cap = regTableToCapability(reg);
2048
2049   task = cap->running_task;
2050   tso = cap->r.rCurrentTSO;
2051
2052   IF_DEBUG(scheduler,
2053            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2054
2055   // XXX this might not be necessary --SDM
2056   tso->what_next = ThreadRunGHC;
2057
2058   threadPaused(tso);
2059
2060   if(tso->blocked_exceptions == NULL)  {
2061       tso->why_blocked = BlockedOnCCall;
2062       tso->blocked_exceptions = END_TSO_QUEUE;
2063   } else {
2064       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2065   }
2066
2067   // Hand back capability
2068   task->suspended_tso = tso;
2069
2070   ACQUIRE_LOCK(&cap->lock);
2071
2072   suspendTask(cap,task);
2073   cap->in_haskell = rtsFalse;
2074   releaseCapability_(cap);
2075   
2076   RELEASE_LOCK(&cap->lock);
2077
2078 #if defined(THREADED_RTS)
2079   /* Preparing to leave the RTS, so ensure there's a native thread/task
2080      waiting to take over.
2081   */
2082   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2083 #endif
2084
2085   errno = saved_errno;
2086   return task;
2087 }
2088
2089 StgRegTable *
2090 resumeThread (void *task_)
2091 {
2092     StgTSO *tso;
2093     Capability *cap;
2094     int saved_errno = errno;
2095     Task *task = task_;
2096
2097     cap = task->cap;
2098     // Wait for permission to re-enter the RTS with the result.
2099     waitForReturnCapability(&cap,task);
2100     // we might be on a different capability now... but if so, our
2101     // entry on the suspended_ccalling_tasks list will also have been
2102     // migrated.
2103
2104     // Remove the thread from the suspended list
2105     recoverSuspendedTask(cap,task);
2106
2107     tso = task->suspended_tso;
2108     task->suspended_tso = NULL;
2109     tso->link = END_TSO_QUEUE;
2110     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2111     
2112     if (tso->why_blocked == BlockedOnCCall) {
2113         awakenBlockedQueue(cap,tso->blocked_exceptions);
2114         tso->blocked_exceptions = NULL;
2115     }
2116     
2117     /* Reset blocking status */
2118     tso->why_blocked  = NotBlocked;
2119     
2120     cap->r.rCurrentTSO = tso;
2121     cap->in_haskell = rtsTrue;
2122     errno = saved_errno;
2123
2124     return &cap->r;
2125 }
2126
2127 /* ---------------------------------------------------------------------------
2128  * Comparing Thread ids.
2129  *
2130  * This is used from STG land in the implementation of the
2131  * instances of Eq/Ord for ThreadIds.
2132  * ------------------------------------------------------------------------ */
2133
2134 int
2135 cmp_thread(StgPtr tso1, StgPtr tso2) 
2136
2137   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2138   StgThreadID id2 = ((StgTSO *)tso2)->id;
2139  
2140   if (id1 < id2) return (-1);
2141   if (id1 > id2) return 1;
2142   return 0;
2143 }
2144
2145 /* ---------------------------------------------------------------------------
2146  * Fetching the ThreadID from an StgTSO.
2147  *
2148  * This is used in the implementation of Show for ThreadIds.
2149  * ------------------------------------------------------------------------ */
2150 int
2151 rts_getThreadId(StgPtr tso) 
2152 {
2153   return ((StgTSO *)tso)->id;
2154 }
2155
2156 #ifdef DEBUG
2157 void
2158 labelThread(StgPtr tso, char *label)
2159 {
2160   int len;
2161   void *buf;
2162
2163   /* Caveat: Once set, you can only set the thread name to "" */
2164   len = strlen(label)+1;
2165   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2166   strncpy(buf,label,len);
2167   /* Update will free the old memory for us */
2168   updateThreadLabel(((StgTSO *)tso)->id,buf);
2169 }
2170 #endif /* DEBUG */
2171
2172 /* ---------------------------------------------------------------------------
2173    Create a new thread.
2174
2175    The new thread starts with the given stack size.  Before the
2176    scheduler can run, however, this thread needs to have a closure
2177    (and possibly some arguments) pushed on its stack.  See
2178    pushClosure() in Schedule.h.
2179
2180    createGenThread() and createIOThread() (in SchedAPI.h) are
2181    convenient packaged versions of this function.
2182
2183    currently pri (priority) is only used in a GRAN setup -- HWL
2184    ------------------------------------------------------------------------ */
2185 #if defined(GRAN)
2186 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2187 StgTSO *
2188 createThread(nat size, StgInt pri)
2189 #else
2190 StgTSO *
2191 createThread(Capability *cap, nat size)
2192 #endif
2193 {
2194     StgTSO *tso;
2195     nat stack_size;
2196
2197     /* sched_mutex is *not* required */
2198
2199     /* First check whether we should create a thread at all */
2200 #if defined(PARALLEL_HASKELL)
2201     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2202     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2203         threadsIgnored++;
2204         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2205                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2206         return END_TSO_QUEUE;
2207     }
2208     threadsCreated++;
2209 #endif
2210
2211 #if defined(GRAN)
2212     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2213 #endif
2214
2215     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2216
2217     /* catch ridiculously small stack sizes */
2218     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2219         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2220     }
2221
2222     stack_size = size - TSO_STRUCT_SIZEW;
2223     
2224     tso = (StgTSO *)allocateLocal(cap, size);
2225     TICK_ALLOC_TSO(stack_size, 0);
2226
2227     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2228 #if defined(GRAN)
2229     SET_GRAN_HDR(tso, ThisPE);
2230 #endif
2231
2232     // Always start with the compiled code evaluator
2233     tso->what_next = ThreadRunGHC;
2234
2235     tso->why_blocked  = NotBlocked;
2236     tso->blocked_exceptions = NULL;
2237     
2238     tso->saved_errno = 0;
2239     tso->bound = NULL;
2240     
2241     tso->stack_size     = stack_size;
2242     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2243                           - TSO_STRUCT_SIZEW;
2244     tso->sp             = (P_)&(tso->stack) + stack_size;
2245
2246     tso->trec = NO_TREC;
2247     
2248 #ifdef PROFILING
2249     tso->prof.CCCS = CCS_MAIN;
2250 #endif
2251     
2252   /* put a stop frame on the stack */
2253     tso->sp -= sizeofW(StgStopFrame);
2254     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2255     tso->link = END_TSO_QUEUE;
2256     
2257   // ToDo: check this
2258 #if defined(GRAN)
2259     /* uses more flexible routine in GranSim */
2260     insertThread(tso, CurrentProc);
2261 #else
2262     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2263      * from its creation
2264      */
2265 #endif
2266     
2267 #if defined(GRAN) 
2268     if (RtsFlags.GranFlags.GranSimStats.Full) 
2269         DumpGranEvent(GR_START,tso);
2270 #elif defined(PARALLEL_HASKELL)
2271     if (RtsFlags.ParFlags.ParStats.Full) 
2272         DumpGranEvent(GR_STARTQ,tso);
2273     /* HACk to avoid SCHEDULE 
2274        LastTSO = tso; */
2275 #endif
2276     
2277     /* Link the new thread on the global thread list.
2278      */
2279     ACQUIRE_LOCK(&sched_mutex);
2280     tso->id = next_thread_id++;  // while we have the mutex
2281     tso->global_link = all_threads;
2282     all_threads = tso;
2283     RELEASE_LOCK(&sched_mutex);
2284     
2285 #if defined(DIST)
2286     tso->dist.priority = MandatoryPriority; //by default that is...
2287 #endif
2288     
2289 #if defined(GRAN)
2290     tso->gran.pri = pri;
2291 # if defined(DEBUG)
2292     tso->gran.magic = TSO_MAGIC; // debugging only
2293 # endif
2294     tso->gran.sparkname   = 0;
2295     tso->gran.startedat   = CURRENT_TIME; 
2296     tso->gran.exported    = 0;
2297     tso->gran.basicblocks = 0;
2298     tso->gran.allocs      = 0;
2299     tso->gran.exectime    = 0;
2300     tso->gran.fetchtime   = 0;
2301     tso->gran.fetchcount  = 0;
2302     tso->gran.blocktime   = 0;
2303     tso->gran.blockcount  = 0;
2304     tso->gran.blockedat   = 0;
2305     tso->gran.globalsparks = 0;
2306     tso->gran.localsparks  = 0;
2307     if (RtsFlags.GranFlags.Light)
2308         tso->gran.clock  = Now; /* local clock */
2309     else
2310         tso->gran.clock  = 0;
2311     
2312     IF_DEBUG(gran,printTSO(tso));
2313 #elif defined(PARALLEL_HASKELL)
2314 # if defined(DEBUG)
2315     tso->par.magic = TSO_MAGIC; // debugging only
2316 # endif
2317     tso->par.sparkname   = 0;
2318     tso->par.startedat   = CURRENT_TIME; 
2319     tso->par.exported    = 0;
2320     tso->par.basicblocks = 0;
2321     tso->par.allocs      = 0;
2322     tso->par.exectime    = 0;
2323     tso->par.fetchtime   = 0;
2324     tso->par.fetchcount  = 0;
2325     tso->par.blocktime   = 0;
2326     tso->par.blockcount  = 0;
2327     tso->par.blockedat   = 0;
2328     tso->par.globalsparks = 0;
2329     tso->par.localsparks  = 0;
2330 #endif
2331     
2332 #if defined(GRAN)
2333     globalGranStats.tot_threads_created++;
2334     globalGranStats.threads_created_on_PE[CurrentProc]++;
2335     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2336     globalGranStats.tot_sq_probes++;
2337 #elif defined(PARALLEL_HASKELL)
2338     // collect parallel global statistics (currently done together with GC stats)
2339     if (RtsFlags.ParFlags.ParStats.Global &&
2340         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2341         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2342         globalParStats.tot_threads_created++;
2343     }
2344 #endif 
2345     
2346 #if defined(GRAN)
2347     IF_GRAN_DEBUG(pri,
2348                   sched_belch("==__ schedule: Created TSO %d (%p);",
2349                               CurrentProc, tso, tso->id));
2350 #elif defined(PARALLEL_HASKELL)
2351     IF_PAR_DEBUG(verbose,
2352                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2353                              (long)tso->id, tso, advisory_thread_count));
2354 #else
2355     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2356                                    (long)tso->id, (long)tso->stack_size));
2357 #endif    
2358     return tso;
2359 }
2360
2361 #if defined(PAR)
2362 /* RFP:
2363    all parallel thread creation calls should fall through the following routine.
2364 */
2365 StgTSO *
2366 createThreadFromSpark(rtsSpark spark) 
2367 { StgTSO *tso;
2368   ASSERT(spark != (rtsSpark)NULL);
2369 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2370   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2371   { threadsIgnored++;
2372     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2373           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2374     return END_TSO_QUEUE;
2375   }
2376   else
2377   { threadsCreated++;
2378     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2379     if (tso==END_TSO_QUEUE)     
2380       barf("createSparkThread: Cannot create TSO");
2381 #if defined(DIST)
2382     tso->priority = AdvisoryPriority;
2383 #endif
2384     pushClosure(tso,spark);
2385     addToRunQueue(tso);
2386     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2387   }
2388   return tso;
2389 }
2390 #endif
2391
2392 /*
2393   Turn a spark into a thread.
2394   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2395 */
2396 #if 0
2397 StgTSO *
2398 activateSpark (rtsSpark spark) 
2399 {
2400   StgTSO *tso;
2401
2402   tso = createSparkThread(spark);
2403   if (RtsFlags.ParFlags.ParStats.Full) {   
2404     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2405       IF_PAR_DEBUG(verbose,
2406                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2407                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2408   }
2409   // ToDo: fwd info on local/global spark to thread -- HWL
2410   // tso->gran.exported =  spark->exported;
2411   // tso->gran.locked =   !spark->global;
2412   // tso->gran.sparkname = spark->name;
2413
2414   return tso;
2415 }
2416 #endif
2417
2418 /* ---------------------------------------------------------------------------
2419  * scheduleThread()
2420  *
2421  * scheduleThread puts a thread on the end  of the runnable queue.
2422  * This will usually be done immediately after a thread is created.
2423  * The caller of scheduleThread must create the thread using e.g.
2424  * createThread and push an appropriate closure
2425  * on this thread's stack before the scheduler is invoked.
2426  * ------------------------------------------------------------------------ */
2427
2428 void
2429 scheduleThread(Capability *cap, StgTSO *tso)
2430 {
2431     // The thread goes at the *end* of the run-queue, to avoid possible
2432     // starvation of any threads already on the queue.
2433     appendToRunQueue(cap,tso);
2434 }
2435
2436 Capability *
2437 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2438 {
2439     Task *task;
2440
2441     // We already created/initialised the Task
2442     task = cap->running_task;
2443
2444     // This TSO is now a bound thread; make the Task and TSO
2445     // point to each other.
2446     tso->bound = task;
2447
2448     task->tso = tso;
2449     task->ret = ret;
2450     task->stat = NoStatus;
2451
2452     appendToRunQueue(cap,tso);
2453
2454     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2455
2456 #if defined(GRAN)
2457     /* GranSim specific init */
2458     CurrentTSO = m->tso;                // the TSO to run
2459     procStatus[MainProc] = Busy;        // status of main PE
2460     CurrentProc = MainProc;             // PE to run it on
2461 #endif
2462
2463     cap = schedule(cap,task);
2464
2465     ASSERT(task->stat != NoStatus);
2466     ASSERT(cap->running_task == task);
2467     ASSERT(task->cap == cap);
2468
2469     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2470     return cap;
2471 }
2472
2473 /* ----------------------------------------------------------------------------
2474  * Starting Tasks
2475  * ------------------------------------------------------------------------- */
2476
2477 #if defined(THREADED_RTS)
2478 void
2479 workerStart(Task *task)
2480 {
2481     Capability *cap;
2482
2483     // See startWorkerTask().
2484     ACQUIRE_LOCK(&task->lock);
2485     cap = task->cap;
2486     RELEASE_LOCK(&task->lock);
2487
2488     // set the thread-local pointer to the Task:
2489     taskEnter(task);
2490
2491     // schedule() runs without a lock.
2492     cap = schedule(cap,task);
2493
2494     // On exit from schedule(), we have a Capability.
2495     releaseCapability(cap);
2496     taskStop(task);
2497 }
2498 #endif
2499
2500 /* ---------------------------------------------------------------------------
2501  * initScheduler()
2502  *
2503  * Initialise the scheduler.  This resets all the queues - if the
2504  * queues contained any threads, they'll be garbage collected at the
2505  * next pass.
2506  *
2507  * ------------------------------------------------------------------------ */
2508
2509 void 
2510 initScheduler(void)
2511 {
2512 #if defined(GRAN)
2513   nat i;
2514   for (i=0; i<=MAX_PROC; i++) {
2515     run_queue_hds[i]      = END_TSO_QUEUE;
2516     run_queue_tls[i]      = END_TSO_QUEUE;
2517     blocked_queue_hds[i]  = END_TSO_QUEUE;
2518     blocked_queue_tls[i]  = END_TSO_QUEUE;
2519     ccalling_threadss[i]  = END_TSO_QUEUE;
2520     blackhole_queue[i]    = END_TSO_QUEUE;
2521     sleeping_queue        = END_TSO_QUEUE;
2522   }
2523 #elif !defined(THREADED_RTS)
2524   blocked_queue_hd  = END_TSO_QUEUE;
2525   blocked_queue_tl  = END_TSO_QUEUE;
2526   sleeping_queue    = END_TSO_QUEUE;
2527 #endif
2528
2529   blackhole_queue   = END_TSO_QUEUE;
2530   all_threads       = END_TSO_QUEUE;
2531
2532   context_switch = 0;
2533   interrupted    = 0;
2534
2535   RtsFlags.ConcFlags.ctxtSwitchTicks =
2536       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2537       
2538 #if defined(THREADED_RTS)
2539   /* Initialise the mutex and condition variables used by
2540    * the scheduler. */
2541   initMutex(&sched_mutex);
2542 #endif
2543   
2544   ACQUIRE_LOCK(&sched_mutex);
2545
2546   /* A capability holds the state a native thread needs in
2547    * order to execute STG code. At least one capability is
2548    * floating around (only SMP builds have more than one).
2549    */
2550   initCapabilities();
2551
2552   initTaskManager();
2553
2554 #if defined(SMP)
2555   /*
2556    * Eagerly start one worker to run each Capability, except for
2557    * Capability 0.  The idea is that we're probably going to start a
2558    * bound thread on Capability 0 pretty soon, so we don't want a
2559    * worker task hogging it.
2560    */
2561   { 
2562       nat i;
2563       Capability *cap;
2564       for (i = 1; i < n_capabilities; i++) {
2565           cap = &capabilities[i];
2566           ACQUIRE_LOCK(&cap->lock);
2567           startWorkerTask(cap, workerStart);
2568           RELEASE_LOCK(&cap->lock);
2569       }
2570   }
2571 #endif
2572
2573 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2574   initSparkPools();
2575 #endif
2576
2577   RELEASE_LOCK(&sched_mutex);
2578 }
2579
2580 void
2581 exitScheduler( void )
2582 {
2583     interrupted = rtsTrue;
2584     shutting_down_scheduler = rtsTrue;
2585
2586 #if defined(THREADED_RTS)
2587     { 
2588         Task *task;
2589         nat i;
2590         
2591         ACQUIRE_LOCK(&sched_mutex);
2592         task = newBoundTask();
2593         RELEASE_LOCK(&sched_mutex);
2594
2595         for (i = 0; i < n_capabilities; i++) {
2596             shutdownCapability(&capabilities[i], task);
2597         }
2598         boundTaskExiting(task);
2599         stopTaskManager();
2600     }
2601 #endif
2602 }
2603
2604 /* ---------------------------------------------------------------------------
2605    Where are the roots that we know about?
2606
2607         - all the threads on the runnable queue
2608         - all the threads on the blocked queue
2609         - all the threads on the sleeping queue
2610         - all the thread currently executing a _ccall_GC
2611         - all the "main threads"
2612      
2613    ------------------------------------------------------------------------ */
2614
2615 /* This has to be protected either by the scheduler monitor, or by the
2616         garbage collection monitor (probably the latter).
2617         KH @ 25/10/99
2618 */
2619
2620 void
2621 GetRoots( evac_fn evac )
2622 {
2623     nat i;
2624     Capability *cap;
2625     Task *task;
2626
2627 #if defined(GRAN)
2628     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2629         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2630             evac((StgClosure **)&run_queue_hds[i]);
2631         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2632             evac((StgClosure **)&run_queue_tls[i]);
2633         
2634         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2635             evac((StgClosure **)&blocked_queue_hds[i]);
2636         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2637             evac((StgClosure **)&blocked_queue_tls[i]);
2638         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2639             evac((StgClosure **)&ccalling_threads[i]);
2640     }
2641
2642     markEventQueue();
2643
2644 #else /* !GRAN */
2645
2646     for (i = 0; i < n_capabilities; i++) {
2647         cap = &capabilities[i];
2648         evac((StgClosure **)&cap->run_queue_hd);
2649         evac((StgClosure **)&cap->run_queue_tl);
2650         
2651         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2652              task=task->next) {
2653             evac((StgClosure **)&task->suspended_tso);
2654         }
2655     }
2656     
2657 #if !defined(THREADED_RTS)
2658     evac((StgClosure **)&blocked_queue_hd);
2659     evac((StgClosure **)&blocked_queue_tl);
2660     evac((StgClosure **)&sleeping_queue);
2661 #endif 
2662 #endif
2663
2664     evac((StgClosure **)&blackhole_queue);
2665
2666 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2667     markSparkQueue(evac);
2668 #endif
2669     
2670 #if defined(RTS_USER_SIGNALS)
2671     // mark the signal handlers (signals should be already blocked)
2672     markSignalHandlers(evac);
2673 #endif
2674 }
2675
2676 /* -----------------------------------------------------------------------------
2677    performGC
2678
2679    This is the interface to the garbage collector from Haskell land.
2680    We provide this so that external C code can allocate and garbage
2681    collect when called from Haskell via _ccall_GC.
2682
2683    It might be useful to provide an interface whereby the programmer
2684    can specify more roots (ToDo).
2685    
2686    This needs to be protected by the GC condition variable above.  KH.
2687    -------------------------------------------------------------------------- */
2688
2689 static void (*extra_roots)(evac_fn);
2690
2691 void
2692 performGC(void)
2693 {
2694 #ifdef THREADED_RTS
2695     // ToDo: we have to grab all the capabilities here.
2696     errorBelch("performGC not supported in threaded RTS (yet)");
2697     stg_exit(EXIT_FAILURE);
2698 #endif
2699     /* Obligated to hold this lock upon entry */
2700     GarbageCollect(GetRoots,rtsFalse);
2701 }
2702
2703 void
2704 performMajorGC(void)
2705 {
2706 #ifdef THREADED_RTS
2707     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2708     stg_exit(EXIT_FAILURE);
2709 #endif
2710     GarbageCollect(GetRoots,rtsTrue);
2711 }
2712
2713 static void
2714 AllRoots(evac_fn evac)
2715 {
2716     GetRoots(evac);             // the scheduler's roots
2717     extra_roots(evac);          // the user's roots
2718 }
2719
2720 void
2721 performGCWithRoots(void (*get_roots)(evac_fn))
2722 {
2723 #ifdef THREADED_RTS
2724     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2725     stg_exit(EXIT_FAILURE);
2726 #endif
2727     extra_roots = get_roots;
2728     GarbageCollect(AllRoots,rtsFalse);
2729 }
2730
2731 /* -----------------------------------------------------------------------------
2732    Stack overflow
2733
2734    If the thread has reached its maximum stack size, then raise the
2735    StackOverflow exception in the offending thread.  Otherwise
2736    relocate the TSO into a larger chunk of memory and adjust its stack
2737    size appropriately.
2738    -------------------------------------------------------------------------- */
2739
2740 static StgTSO *
2741 threadStackOverflow(Capability *cap, StgTSO *tso)
2742 {
2743   nat new_stack_size, stack_words;
2744   lnat new_tso_size;
2745   StgPtr new_sp;
2746   StgTSO *dest;
2747
2748   IF_DEBUG(sanity,checkTSO(tso));
2749   if (tso->stack_size >= tso->max_stack_size) {
2750
2751     IF_DEBUG(gc,
2752              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2753                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2754              /* If we're debugging, just print out the top of the stack */
2755              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2756                                               tso->sp+64)));
2757
2758     /* Send this thread the StackOverflow exception */
2759     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2760     return tso;
2761   }
2762
2763   /* Try to double the current stack size.  If that takes us over the
2764    * maximum stack size for this thread, then use the maximum instead.
2765    * Finally round up so the TSO ends up as a whole number of blocks.
2766    */
2767   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2768   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2769                                        TSO_STRUCT_SIZE)/sizeof(W_);
2770   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2771   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2772
2773   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
2774
2775   dest = (StgTSO *)allocate(new_tso_size);
2776   TICK_ALLOC_TSO(new_stack_size,0);
2777
2778   /* copy the TSO block and the old stack into the new area */
2779   memcpy(dest,tso,TSO_STRUCT_SIZE);
2780   stack_words = tso->stack + tso->stack_size - tso->sp;
2781   new_sp = (P_)dest + new_tso_size - stack_words;
2782   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2783
2784   /* relocate the stack pointers... */
2785   dest->sp         = new_sp;
2786   dest->stack_size = new_stack_size;
2787         
2788   /* Mark the old TSO as relocated.  We have to check for relocated
2789    * TSOs in the garbage collector and any primops that deal with TSOs.
2790    *
2791    * It's important to set the sp value to just beyond the end
2792    * of the stack, so we don't attempt to scavenge any part of the
2793    * dead TSO's stack.
2794    */
2795   tso->what_next = ThreadRelocated;
2796   tso->link = dest;
2797   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2798   tso->why_blocked = NotBlocked;
2799
2800   IF_PAR_DEBUG(verbose,
2801                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2802                      tso->id, tso, tso->stack_size);
2803                /* If we're debugging, just print out the top of the stack */
2804                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2805                                                 tso->sp+64)));
2806   
2807   IF_DEBUG(sanity,checkTSO(tso));
2808 #if 0
2809   IF_DEBUG(scheduler,printTSO(dest));
2810 #endif
2811
2812   return dest;
2813 }
2814
2815 /* ---------------------------------------------------------------------------
2816    Wake up a queue that was blocked on some resource.
2817    ------------------------------------------------------------------------ */
2818
2819 #if defined(GRAN)
2820 STATIC_INLINE void
2821 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2822 {
2823 }
2824 #elif defined(PARALLEL_HASKELL)
2825 STATIC_INLINE void
2826 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2827 {
2828   /* write RESUME events to log file and
2829      update blocked and fetch time (depending on type of the orig closure) */
2830   if (RtsFlags.ParFlags.ParStats.Full) {
2831     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2832                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2833                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2834     if (emptyRunQueue())
2835       emitSchedule = rtsTrue;
2836
2837     switch (get_itbl(node)->type) {
2838         case FETCH_ME_BQ:
2839           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2840           break;
2841         case RBH:
2842         case FETCH_ME:
2843         case BLACKHOLE_BQ:
2844           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2845           break;
2846 #ifdef DIST
2847         case MVAR:
2848           break;
2849 #endif    
2850         default:
2851           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2852         }
2853       }
2854 }
2855 #endif
2856
2857 #if defined(GRAN)
2858 StgBlockingQueueElement *
2859 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2860 {
2861     StgTSO *tso;
2862     PEs node_loc, tso_loc;
2863
2864     node_loc = where_is(node); // should be lifted out of loop
2865     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2866     tso_loc = where_is((StgClosure *)tso);
2867     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2868       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2869       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2870       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2871       // insertThread(tso, node_loc);
2872       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2873                 ResumeThread,
2874                 tso, node, (rtsSpark*)NULL);
2875       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2876       // len_local++;
2877       // len++;
2878     } else { // TSO is remote (actually should be FMBQ)
2879       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2880                                   RtsFlags.GranFlags.Costs.gunblocktime +
2881                                   RtsFlags.GranFlags.Costs.latency;
2882       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2883                 UnblockThread,
2884                 tso, node, (rtsSpark*)NULL);
2885       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2886       // len++;
2887     }
2888     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2889     IF_GRAN_DEBUG(bq,
2890                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2891                           (node_loc==tso_loc ? "Local" : "Global"), 
2892                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2893     tso->block_info.closure = NULL;
2894     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2895                              tso->id, tso));
2896 }
2897 #elif defined(PARALLEL_HASKELL)
2898 StgBlockingQueueElement *
2899 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2900 {
2901     StgBlockingQueueElement *next;
2902
2903     switch (get_itbl(bqe)->type) {
2904     case TSO:
2905       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2906       /* if it's a TSO just push it onto the run_queue */
2907       next = bqe->link;
2908       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2909       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2910       threadRunnable();
2911       unblockCount(bqe, node);
2912       /* reset blocking status after dumping event */
2913       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2914       break;
2915
2916     case BLOCKED_FETCH:
2917       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2918       next = bqe->link;
2919       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2920       PendingFetches = (StgBlockedFetch *)bqe;
2921       break;
2922
2923 # if defined(DEBUG)
2924       /* can ignore this case in a non-debugging setup; 
2925          see comments on RBHSave closures above */
2926     case CONSTR:
2927       /* check that the closure is an RBHSave closure */
2928       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2929              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2930              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2931       break;
2932
2933     default:
2934       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2935            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2936            (StgClosure *)bqe);
2937 # endif
2938     }
2939   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2940   return next;
2941 }
2942 #endif
2943
2944 StgTSO *
2945 unblockOne(Capability *cap, StgTSO *tso)
2946 {
2947   StgTSO *next;
2948
2949   ASSERT(get_itbl(tso)->type == TSO);
2950   ASSERT(tso->why_blocked != NotBlocked);
2951   tso->why_blocked = NotBlocked;
2952   next = tso->link;
2953   tso->link = END_TSO_QUEUE;
2954
2955   // We might have just migrated this TSO to our Capability:
2956   if (tso->bound) {
2957       tso->bound->cap = cap;
2958   }
2959
2960   appendToRunQueue(cap,tso);
2961
2962   // we're holding a newly woken thread, make sure we context switch
2963   // quickly so we can migrate it if necessary.
2964   context_switch = 1;
2965   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2966   return next;
2967 }
2968
2969
2970 #if defined(GRAN)
2971 void 
2972 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2973 {
2974   StgBlockingQueueElement *bqe;
2975   PEs node_loc;
2976   nat len = 0; 
2977
2978   IF_GRAN_DEBUG(bq, 
2979                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2980                       node, CurrentProc, CurrentTime[CurrentProc], 
2981                       CurrentTSO->id, CurrentTSO));
2982
2983   node_loc = where_is(node);
2984
2985   ASSERT(q == END_BQ_QUEUE ||
2986          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2987          get_itbl(q)->type == CONSTR); // closure (type constructor)
2988   ASSERT(is_unique(node));
2989
2990   /* FAKE FETCH: magically copy the node to the tso's proc;
2991      no Fetch necessary because in reality the node should not have been 
2992      moved to the other PE in the first place
2993   */
2994   if (CurrentProc!=node_loc) {
2995     IF_GRAN_DEBUG(bq, 
2996                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2997                         node, node_loc, CurrentProc, CurrentTSO->id, 
2998                         // CurrentTSO, where_is(CurrentTSO),
2999                         node->header.gran.procs));
3000     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3001     IF_GRAN_DEBUG(bq, 
3002                   debugBelch("## new bitmask of node %p is %#x\n",
3003                         node, node->header.gran.procs));
3004     if (RtsFlags.GranFlags.GranSimStats.Global) {
3005       globalGranStats.tot_fake_fetches++;
3006     }
3007   }
3008
3009   bqe = q;
3010   // ToDo: check: ASSERT(CurrentProc==node_loc);
3011   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3012     //next = bqe->link;
3013     /* 
3014        bqe points to the current element in the queue
3015        next points to the next element in the queue
3016     */
3017     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3018     //tso_loc = where_is(tso);
3019     len++;
3020     bqe = unblockOne(bqe, node);
3021   }
3022
3023   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3024      the closure to make room for the anchor of the BQ */
3025   if (bqe!=END_BQ_QUEUE) {
3026     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3027     /*
3028     ASSERT((info_ptr==&RBH_Save_0_info) ||
3029            (info_ptr==&RBH_Save_1_info) ||
3030            (info_ptr==&RBH_Save_2_info));
3031     */
3032     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3033     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3034     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3035
3036     IF_GRAN_DEBUG(bq,
3037                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3038                         node, info_type(node)));
3039   }
3040
3041   /* statistics gathering */
3042   if (RtsFlags.GranFlags.GranSimStats.Global) {
3043     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3044     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3045     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3046     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3047   }
3048   IF_GRAN_DEBUG(bq,
3049                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3050                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3051 }
3052 #elif defined(PARALLEL_HASKELL)
3053 void 
3054 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3055 {
3056   StgBlockingQueueElement *bqe;
3057
3058   IF_PAR_DEBUG(verbose, 
3059                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3060                      node, mytid));
3061 #ifdef DIST  
3062   //RFP
3063   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3064     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3065     return;
3066   }
3067 #endif
3068   
3069   ASSERT(q == END_BQ_QUEUE ||
3070          get_itbl(q)->type == TSO ||           
3071          get_itbl(q)->type == BLOCKED_FETCH || 
3072          get_itbl(q)->type == CONSTR); 
3073
3074   bqe = q;
3075   while (get_itbl(bqe)->type==TSO || 
3076          get_itbl(bqe)->type==BLOCKED_FETCH) {
3077     bqe = unblockOne(bqe, node);
3078   }
3079 }
3080
3081 #else   /* !GRAN && !PARALLEL_HASKELL */
3082
3083 void
3084 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3085 {
3086     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3087                              // Exception.cmm
3088     while (tso != END_TSO_QUEUE) {
3089         tso = unblockOne(cap,tso);
3090     }
3091 }
3092 #endif
3093
3094 /* ---------------------------------------------------------------------------
3095    Interrupt execution
3096    - usually called inside a signal handler so it mustn't do anything fancy.   
3097    ------------------------------------------------------------------------ */
3098
3099 void
3100 interruptStgRts(void)
3101 {
3102     interrupted    = 1;
3103     context_switch = 1;
3104 #if defined(THREADED_RTS)
3105     prodAllCapabilities();
3106 #endif
3107 }
3108
3109 /* -----------------------------------------------------------------------------
3110    Unblock a thread
3111
3112    This is for use when we raise an exception in another thread, which
3113    may be blocked.
3114    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3115    -------------------------------------------------------------------------- */
3116
3117 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3118 /*
3119   NB: only the type of the blocking queue is different in GranSim and GUM
3120       the operations on the queue-elements are the same
3121       long live polymorphism!
3122
3123   Locks: sched_mutex is held upon entry and exit.
3124
3125 */
3126 static void
3127 unblockThread(Capability *cap, StgTSO *tso)
3128 {
3129   StgBlockingQueueElement *t, **last;
3130
3131   switch (tso->why_blocked) {
3132
3133   case NotBlocked:
3134     return;  /* not blocked */
3135
3136   case BlockedOnSTM:
3137     // Be careful: nothing to do here!  We tell the scheduler that the thread
3138     // is runnable and we leave it to the stack-walking code to abort the 
3139     // transaction while unwinding the stack.  We should perhaps have a debugging
3140     // test to make sure that this really happens and that the 'zombie' transaction
3141     // does not get committed.
3142     goto done;
3143
3144   case BlockedOnMVar:
3145     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3146     {
3147       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3148       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3149
3150       last = (StgBlockingQueueElement **)&mvar->head;
3151       for (t = (StgBlockingQueueElement *)mvar->head; 
3152            t != END_BQ_QUEUE; 
3153            last = &t->link, last_tso = t, t = t->link) {
3154         if (t == (StgBlockingQueueElement *)tso) {
3155           *last = (StgBlockingQueueElement *)tso->link;
3156           if (mvar->tail == tso) {
3157             mvar->tail = (StgTSO *)last_tso;
3158           }
3159           goto done;
3160         }
3161       }
3162       barf("unblockThread (MVAR): TSO not found");
3163     }
3164
3165   case BlockedOnBlackHole:
3166     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3167     {
3168       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3169
3170       last = &bq->blocking_queue;
3171       for (t = bq->blocking_queue; 
3172            t != END_BQ_QUEUE; 
3173            last = &t->link, t = t->link) {
3174         if (t == (StgBlockingQueueElement *)tso) {
3175           *last = (StgBlockingQueueElement *)tso->link;
3176           goto done;
3177         }
3178       }
3179       barf("unblockThread (BLACKHOLE): TSO not found");
3180     }
3181
3182   case BlockedOnException:
3183     {
3184       StgTSO *target  = tso->block_info.tso;
3185
3186       ASSERT(get_itbl(target)->type == TSO);
3187
3188       if (target->what_next == ThreadRelocated) {
3189           target = target->link;
3190           ASSERT(get_itbl(target)->type == TSO);
3191       }
3192
3193       ASSERT(target->blocked_exceptions != NULL);
3194
3195       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3196       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3197            t != END_BQ_QUEUE; 
3198            last = &t->link, t = t->link) {
3199         ASSERT(get_itbl(t)->type == TSO);
3200         if (t == (StgBlockingQueueElement *)tso) {
3201           *last = (StgBlockingQueueElement *)tso->link;
3202           goto done;
3203         }
3204       }
3205       barf("unblockThread (Exception): TSO not found");
3206     }
3207
3208   case BlockedOnRead:
3209   case BlockedOnWrite:
3210 #if defined(mingw32_HOST_OS)
3211   case BlockedOnDoProc:
3212 #endif
3213     {
3214       /* take TSO off blocked_queue */
3215       StgBlockingQueueElement *prev = NULL;
3216       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3217            prev = t, t = t->link) {
3218         if (t == (StgBlockingQueueElement *)tso) {
3219           if (prev == NULL) {
3220             blocked_queue_hd = (StgTSO *)t->link;
3221             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3222               blocked_queue_tl = END_TSO_QUEUE;
3223             }
3224           } else {
3225             prev->link = t->link;
3226             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3227               blocked_queue_tl = (StgTSO *)prev;
3228             }
3229           }
3230 #if defined(mingw32_HOST_OS)
3231           /* (Cooperatively) signal that the worker thread should abort
3232            * the request.
3233            */
3234           abandonWorkRequest(tso->block_info.async_result->reqID);
3235 #endif
3236           goto done;
3237         }
3238       }
3239       barf("unblockThread (I/O): TSO not found");
3240     }
3241
3242   case BlockedOnDelay:
3243     {
3244       /* take TSO off sleeping_queue */
3245       StgBlockingQueueElement *prev = NULL;
3246       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3247            prev = t, t = t->link) {
3248         if (t == (StgBlockingQueueElement *)tso) {
3249           if (prev == NULL) {
3250             sleeping_queue = (StgTSO *)t->link;
3251           } else {
3252             prev->link = t->link;
3253           }
3254           goto done;
3255         }
3256       }
3257       barf("unblockThread (delay): TSO not found");
3258     }
3259
3260   default:
3261     barf("unblockThread");
3262   }
3263
3264  done:
3265   tso->link = END_TSO_QUEUE;
3266   tso->why_blocked = NotBlocked;
3267   tso->block_info.closure = NULL;
3268   pushOnRunQueue(cap,tso);
3269 }
3270 #else
3271 static void
3272 unblockThread(Capability *cap, StgTSO *tso)
3273 {
3274   StgTSO *t, **last;
3275   
3276   /* To avoid locking unnecessarily. */
3277   if (tso->why_blocked == NotBlocked) {
3278     return;
3279   }
3280
3281   switch (tso->why_blocked) {
3282
3283   case BlockedOnSTM:
3284     // Be careful: nothing to do here!  We tell the scheduler that the thread
3285     // is runnable and we leave it to the stack-walking code to abort the 
3286     // transaction while unwinding the stack.  We should perhaps have a debugging
3287     // test to make sure that this really happens and that the 'zombie' transaction
3288     // does not get committed.
3289     goto done;
3290
3291   case BlockedOnMVar:
3292     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3293     {
3294       StgTSO *last_tso = END_TSO_QUEUE;
3295       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3296
3297       last = &mvar->head;
3298       for (t = mvar->head; t != END_TSO_QUEUE; 
3299            last = &t->link, last_tso = t, t = t->link) {
3300         if (t == tso) {
3301           *last = tso->link;
3302           if (mvar->tail == tso) {
3303             mvar->tail = last_tso;
3304           }
3305           goto done;
3306         }
3307       }
3308       barf("unblockThread (MVAR): TSO not found");
3309     }
3310
3311   case BlockedOnBlackHole:
3312     {
3313       last = &blackhole_queue;
3314       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3315            last = &t->link, t = t->link) {
3316         if (t == tso) {
3317           *last = tso->link;
3318           goto done;
3319         }
3320       }
3321       barf("unblockThread (BLACKHOLE): TSO not found");
3322     }
3323
3324   case BlockedOnException:
3325     {
3326       StgTSO *target  = tso->block_info.tso;
3327
3328       ASSERT(get_itbl(target)->type == TSO);
3329
3330       while (target->what_next == ThreadRelocated) {
3331           target = target->link;
3332           ASSERT(get_itbl(target)->type == TSO);
3333       }
3334       
3335       ASSERT(target->blocked_exceptions != NULL);
3336
3337       last = &target->blocked_exceptions;
3338       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3339            last = &t->link, t = t->link) {
3340         ASSERT(get_itbl(t)->type == TSO);
3341         if (t == tso) {
3342           *last = tso->link;
3343           goto done;
3344         }
3345       }
3346       barf("unblockThread (Exception): TSO not found");
3347     }
3348
3349 #if !defined(THREADED_RTS)
3350   case BlockedOnRead:
3351   case BlockedOnWrite:
3352 #if defined(mingw32_HOST_OS)
3353   case BlockedOnDoProc:
3354 #endif
3355     {
3356       StgTSO *prev = NULL;
3357       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3358            prev = t, t = t->link) {
3359         if (t == tso) {
3360           if (prev == NULL) {
3361             blocked_queue_hd = t->link;
3362             if (blocked_queue_tl == t) {
3363               blocked_queue_tl = END_TSO_QUEUE;
3364             }
3365           } else {
3366             prev->link = t->link;
3367             if (blocked_queue_tl == t) {
3368               blocked_queue_tl = prev;
3369             }
3370           }
3371 #if defined(mingw32_HOST_OS)
3372           /* (Cooperatively) signal that the worker thread should abort
3373            * the request.
3374            */
3375           abandonWorkRequest(tso->block_info.async_result->reqID);
3376 #endif
3377           goto done;
3378         }
3379       }
3380       barf("unblockThread (I/O): TSO not found");
3381     }
3382
3383   case BlockedOnDelay:
3384     {
3385       StgTSO *prev = NULL;
3386       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3387            prev = t, t = t->link) {
3388         if (t == tso) {
3389           if (prev == NULL) {
3390             sleeping_queue = t->link;
3391           } else {
3392             prev->link = t->link;
3393           }
3394           goto done;
3395         }
3396       }
3397       barf("unblockThread (delay): TSO not found");
3398     }
3399 #endif
3400
3401   default:
3402     barf("unblockThread");
3403   }
3404
3405  done:
3406   tso->link = END_TSO_QUEUE;
3407   tso->why_blocked = NotBlocked;
3408   tso->block_info.closure = NULL;
3409   appendToRunQueue(cap,tso);
3410 }
3411 #endif
3412
3413 /* -----------------------------------------------------------------------------
3414  * checkBlackHoles()
3415  *
3416  * Check the blackhole_queue for threads that can be woken up.  We do
3417  * this periodically: before every GC, and whenever the run queue is
3418  * empty.
3419  *
3420  * An elegant solution might be to just wake up all the blocked
3421  * threads with awakenBlockedQueue occasionally: they'll go back to
3422  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3423  * doesn't give us a way to tell whether we've actually managed to
3424  * wake up any threads, so we would be busy-waiting.
3425  *
3426  * -------------------------------------------------------------------------- */
3427
3428 static rtsBool
3429 checkBlackHoles (Capability *cap)
3430 {
3431     StgTSO **prev, *t;
3432     rtsBool any_woke_up = rtsFalse;
3433     StgHalfWord type;
3434
3435     // blackhole_queue is global:
3436     ASSERT_LOCK_HELD(&sched_mutex);
3437
3438     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3439
3440     // ASSUMES: sched_mutex
3441     prev = &blackhole_queue;
3442     t = blackhole_queue;
3443     while (t != END_TSO_QUEUE) {
3444         ASSERT(t->why_blocked == BlockedOnBlackHole);
3445         type = get_itbl(t->block_info.closure)->type;
3446         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3447             IF_DEBUG(sanity,checkTSO(t));
3448             t = unblockOne(cap, t);
3449             // urk, the threads migrate to the current capability
3450             // here, but we'd like to keep them on the original one.
3451             *prev = t;
3452             any_woke_up = rtsTrue;
3453         } else {
3454             prev = &t->link;
3455             t = t->link;
3456         }
3457     }
3458
3459     return any_woke_up;
3460 }
3461
3462 /* -----------------------------------------------------------------------------
3463  * raiseAsync()
3464  *
3465  * The following function implements the magic for raising an
3466  * asynchronous exception in an existing thread.
3467  *
3468  * We first remove the thread from any queue on which it might be
3469  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3470  *
3471  * We strip the stack down to the innermost CATCH_FRAME, building
3472  * thunks in the heap for all the active computations, so they can 
3473  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3474  * an application of the handler to the exception, and push it on
3475  * the top of the stack.
3476  * 
3477  * How exactly do we save all the active computations?  We create an
3478  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3479  * AP_STACKs pushes everything from the corresponding update frame
3480  * upwards onto the stack.  (Actually, it pushes everything up to the
3481  * next update frame plus a pointer to the next AP_STACK object.
3482  * Entering the next AP_STACK object pushes more onto the stack until we
3483  * reach the last AP_STACK object - at which point the stack should look
3484  * exactly as it did when we killed the TSO and we can continue
3485  * execution by entering the closure on top of the stack.
3486  *
3487  * We can also kill a thread entirely - this happens if either (a) the 
3488  * exception passed to raiseAsync is NULL, or (b) there's no
3489  * CATCH_FRAME on the stack.  In either case, we strip the entire
3490  * stack and replace the thread with a zombie.
3491  *
3492  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3493  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3494  * the TSO is currently blocked on or on the run queue of.
3495  *
3496  * -------------------------------------------------------------------------- */
3497  
3498 void
3499 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3500 {
3501     raiseAsync_(cap, tso, exception, rtsFalse);
3502 }
3503
3504 static void
3505 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3506             rtsBool stop_at_atomically)
3507 {
3508     StgRetInfoTable *info;
3509     StgPtr sp;
3510   
3511     // Thread already dead?
3512     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3513         return;
3514     }
3515
3516     IF_DEBUG(scheduler, 
3517              sched_belch("raising exception in thread %ld.", (long)tso->id));
3518     
3519     // Remove it from any blocking queues
3520     unblockThread(cap,tso);
3521
3522     sp = tso->sp;
3523     
3524     // The stack freezing code assumes there's a closure pointer on
3525     // the top of the stack, so we have to arrange that this is the case...
3526     //
3527     if (sp[0] == (W_)&stg_enter_info) {
3528         sp++;
3529     } else {
3530         sp--;
3531         sp[0] = (W_)&stg_dummy_ret_closure;
3532     }
3533
3534     while (1) {
3535         nat i;
3536
3537         // 1. Let the top of the stack be the "current closure"
3538         //
3539         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3540         // CATCH_FRAME.
3541         //
3542         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3543         // current closure applied to the chunk of stack up to (but not
3544         // including) the update frame.  This closure becomes the "current
3545         // closure".  Go back to step 2.
3546         //
3547         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3548         // top of the stack applied to the exception.
3549         // 
3550         // 5. If it's a STOP_FRAME, then kill the thread.
3551         // 
3552         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3553         // transaction
3554        
3555         
3556         StgPtr frame;
3557         
3558         frame = sp + 1;
3559         info = get_ret_itbl((StgClosure *)frame);
3560         
3561         while (info->i.type != UPDATE_FRAME
3562                && (info->i.type != CATCH_FRAME || exception == NULL)
3563                && info->i.type != STOP_FRAME
3564                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3565         {
3566             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3567               // IF we find an ATOMICALLY_FRAME then we abort the
3568               // current transaction and propagate the exception.  In
3569               // this case (unlike ordinary exceptions) we do not care
3570               // whether the transaction is valid or not because its
3571               // possible validity cannot have caused the exception
3572               // and will not be visible after the abort.
3573               IF_DEBUG(stm,
3574                        debugBelch("Found atomically block delivering async exception\n"));
3575               stmAbortTransaction(tso -> trec);
3576               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3577             }
3578             frame += stack_frame_sizeW((StgClosure *)frame);
3579             info = get_ret_itbl((StgClosure *)frame);
3580         }
3581         
3582         switch (info->i.type) {
3583             
3584         case ATOMICALLY_FRAME:
3585             ASSERT(stop_at_atomically);
3586             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3587             stmCondemnTransaction(tso -> trec);
3588 #ifdef REG_R1
3589             tso->sp = frame;
3590 #else
3591             // R1 is not a register: the return convention for IO in
3592             // this case puts the return value on the stack, so we
3593             // need to set up the stack to return to the atomically
3594             // frame properly...
3595             tso->sp = frame - 2;
3596             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3597             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3598 #endif
3599             tso->what_next = ThreadRunGHC;
3600             return;
3601
3602         case CATCH_FRAME:
3603             // If we find a CATCH_FRAME, and we've got an exception to raise,
3604             // then build the THUNK raise(exception), and leave it on
3605             // top of the CATCH_FRAME ready to enter.
3606             //
3607         {
3608 #ifdef PROFILING
3609             StgCatchFrame *cf = (StgCatchFrame *)frame;
3610 #endif
3611             StgThunk *raise;
3612             
3613             // we've got an exception to raise, so let's pass it to the
3614             // handler in this frame.
3615             //
3616             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3617             TICK_ALLOC_SE_THK(1,0);
3618             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3619             raise->payload[0] = exception;
3620             
3621             // throw away the stack from Sp up to the CATCH_FRAME.
3622             //
3623             sp = frame - 1;
3624             
3625             /* Ensure that async excpetions are blocked now, so we don't get
3626              * a surprise exception before we get around to executing the
3627              * handler.
3628              */
3629             if (tso->blocked_exceptions == NULL) {
3630                 tso->blocked_exceptions = END_TSO_QUEUE;
3631             }
3632             
3633             /* Put the newly-built THUNK on top of the stack, ready to execute
3634              * when the thread restarts.
3635              */
3636             sp[0] = (W_)raise;
3637             sp[-1] = (W_)&stg_enter_info;
3638             tso->sp = sp-1;
3639             tso->what_next = ThreadRunGHC;
3640             IF_DEBUG(sanity, checkTSO(tso));
3641             return;
3642         }
3643         
3644         case UPDATE_FRAME:
3645         {
3646             StgAP_STACK * ap;
3647             nat words;
3648             
3649             // First build an AP_STACK consisting of the stack chunk above the
3650             // current update frame, with the top word on the stack as the
3651             // fun field.
3652             //
3653             words = frame - sp - 1;
3654             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3655             
3656             ap->size = words;
3657             ap->fun  = (StgClosure *)sp[0];
3658             sp++;
3659             for(i=0; i < (nat)words; ++i) {
3660                 ap->payload[i] = (StgClosure *)*sp++;
3661             }
3662             
3663             SET_HDR(ap,&stg_AP_STACK_info,
3664                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3665             TICK_ALLOC_UP_THK(words+1,0);
3666             
3667             IF_DEBUG(scheduler,
3668                      debugBelch("sched: Updating ");
3669                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3670                      debugBelch(" with ");
3671                      printObj((StgClosure *)ap);
3672                 );
3673
3674             // Replace the updatee with an indirection - happily
3675             // this will also wake up any threads currently
3676             // waiting on the result.
3677             //
3678             // Warning: if we're in a loop, more than one update frame on
3679             // the stack may point to the same object.  Be careful not to
3680             // overwrite an IND_OLDGEN in this case, because we'll screw
3681             // up the mutable lists.  To be on the safe side, don't
3682             // overwrite any kind of indirection at all.  See also
3683             // threadSqueezeStack in GC.c, where we have to make a similar
3684             // check.
3685             //
3686             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3687                 // revert the black hole
3688                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3689                                (StgClosure *)ap);
3690             }
3691             sp += sizeofW(StgUpdateFrame) - 1;
3692             sp[0] = (W_)ap; // push onto stack
3693             break;
3694         }
3695         
3696         case STOP_FRAME:
3697             // We've stripped the entire stack, the thread is now dead.
3698             sp += sizeofW(StgStopFrame);
3699             tso->what_next = ThreadKilled;
3700             tso->sp = sp;
3701             return;
3702             
3703         default:
3704             barf("raiseAsync");
3705         }
3706     }
3707     barf("raiseAsync");
3708 }
3709
3710 /* -----------------------------------------------------------------------------
3711    Deleting threads
3712
3713    This is used for interruption (^C) and forking, and corresponds to
3714    raising an exception but without letting the thread catch the
3715    exception.
3716    -------------------------------------------------------------------------- */
3717
3718 static void 
3719 deleteThread (Capability *cap, StgTSO *tso)
3720 {
3721   if (tso->why_blocked != BlockedOnCCall &&
3722       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3723       raiseAsync(cap,tso,NULL);
3724   }
3725 }
3726
3727 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3728 static void 
3729 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3730 { // for forkProcess only:
3731   // delete thread without giving it a chance to catch the KillThread exception
3732
3733   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3734       return;
3735   }
3736
3737   if (tso->why_blocked != BlockedOnCCall &&
3738       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3739       unblockThread(cap,tso);
3740   }
3741
3742   tso->what_next = ThreadKilled;
3743 }
3744 #endif
3745
3746 /* -----------------------------------------------------------------------------
3747    raiseExceptionHelper
3748    
3749    This function is called by the raise# primitve, just so that we can
3750    move some of the tricky bits of raising an exception from C-- into
3751    C.  Who knows, it might be a useful re-useable thing here too.
3752    -------------------------------------------------------------------------- */
3753
3754 StgWord
3755 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3756 {
3757     Capability *cap = regTableToCapability(reg);
3758     StgThunk *raise_closure = NULL;
3759     StgPtr p, next;
3760     StgRetInfoTable *info;
3761     //
3762     // This closure represents the expression 'raise# E' where E
3763     // is the exception raise.  It is used to overwrite all the
3764     // thunks which are currently under evaluataion.
3765     //
3766
3767     //    
3768     // LDV profiling: stg_raise_info has THUNK as its closure
3769     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3770     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3771     // 1 does not cause any problem unless profiling is performed.
3772     // However, when LDV profiling goes on, we need to linearly scan
3773     // small object pool, where raise_closure is stored, so we should
3774     // use MIN_UPD_SIZE.
3775     //
3776     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3777     //                                 sizeofW(StgClosure)+1);
3778     //
3779
3780     //
3781     // Walk up the stack, looking for the catch frame.  On the way,
3782     // we update any closures pointed to from update frames with the
3783     // raise closure that we just built.
3784     //
3785     p = tso->sp;
3786     while(1) {
3787         info = get_ret_itbl((StgClosure *)p);
3788         next = p + stack_frame_sizeW((StgClosure *)p);
3789         switch (info->i.type) {
3790             
3791         case UPDATE_FRAME:
3792             // Only create raise_closure if we need to.
3793             if (raise_closure == NULL) {
3794                 raise_closure = 
3795                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3796                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3797                 raise_closure->payload[0] = exception;
3798             }
3799             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3800             p = next;
3801             continue;
3802
3803         case ATOMICALLY_FRAME:
3804             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3805             tso->sp = p;
3806             return ATOMICALLY_FRAME;
3807             
3808         case CATCH_FRAME:
3809             tso->sp = p;
3810             return CATCH_FRAME;
3811
3812         case CATCH_STM_FRAME:
3813             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3814             tso->sp = p;
3815             return CATCH_STM_FRAME;
3816             
3817         case STOP_FRAME:
3818             tso->sp = p;
3819             return STOP_FRAME;
3820
3821         case CATCH_RETRY_FRAME:
3822         default:
3823             p = next; 
3824             continue;
3825         }
3826     }
3827 }
3828
3829
3830 /* -----------------------------------------------------------------------------
3831    findRetryFrameHelper
3832
3833    This function is called by the retry# primitive.  It traverses the stack
3834    leaving tso->sp referring to the frame which should handle the retry.  
3835
3836    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3837    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3838
3839    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3840    despite the similar implementation.
3841
3842    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3843    not be created within memory transactions.
3844    -------------------------------------------------------------------------- */
3845
3846 StgWord
3847 findRetryFrameHelper (StgTSO *tso)
3848 {
3849   StgPtr           p, next;
3850   StgRetInfoTable *info;
3851
3852   p = tso -> sp;
3853   while (1) {
3854     info = get_ret_itbl((StgClosure *)p);
3855     next = p + stack_frame_sizeW((StgClosure *)p);
3856     switch (info->i.type) {
3857       
3858     case ATOMICALLY_FRAME:
3859       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3860       tso->sp = p;
3861       return ATOMICALLY_FRAME;
3862       
3863     case CATCH_RETRY_FRAME:
3864       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3865       tso->sp = p;
3866       return CATCH_RETRY_FRAME;
3867       
3868     case CATCH_STM_FRAME:
3869     default:
3870       ASSERT(info->i.type != CATCH_FRAME);
3871       ASSERT(info->i.type != STOP_FRAME);
3872       p = next; 
3873       continue;
3874     }
3875   }
3876 }
3877
3878 /* -----------------------------------------------------------------------------
3879    resurrectThreads is called after garbage collection on the list of
3880    threads found to be garbage.  Each of these threads will be woken
3881    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3882    on an MVar, or NonTermination if the thread was blocked on a Black
3883    Hole.
3884
3885    Locks: assumes we hold *all* the capabilities.
3886    -------------------------------------------------------------------------- */
3887
3888 void
3889 resurrectThreads (StgTSO *threads)
3890 {
3891     StgTSO *tso, *next;
3892     Capability *cap;
3893
3894     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3895         next = tso->global_link;
3896         tso->global_link = all_threads;
3897         all_threads = tso;
3898         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3899         
3900         // Wake up the thread on the Capability it was last on for a
3901         // bound thread, or last_free_capability otherwise.
3902         if (tso->bound) {
3903             cap = tso->bound->cap;
3904         } else {
3905             cap = last_free_capability;
3906         }
3907         
3908         switch (tso->why_blocked) {
3909         case BlockedOnMVar:
3910         case BlockedOnException:
3911             /* Called by GC - sched_mutex lock is currently held. */
3912             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
3913             break;
3914         case BlockedOnBlackHole:
3915             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
3916             break;
3917         case BlockedOnSTM:
3918             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
3919             break;
3920         case NotBlocked:
3921             /* This might happen if the thread was blocked on a black hole
3922              * belonging to a thread that we've just woken up (raiseAsync
3923              * can wake up threads, remember...).
3924              */
3925             continue;
3926         default:
3927             barf("resurrectThreads: thread blocked in a strange way");
3928         }
3929     }
3930 }
3931
3932 /* ----------------------------------------------------------------------------
3933  * Debugging: why is a thread blocked
3934  * [Also provides useful information when debugging threaded programs
3935  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3936    ------------------------------------------------------------------------- */
3937
3938 #if DEBUG
3939 static void
3940 printThreadBlockage(StgTSO *tso)
3941 {
3942   switch (tso->why_blocked) {
3943   case BlockedOnRead:
3944     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
3945     break;
3946   case BlockedOnWrite:
3947     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
3948     break;
3949 #if defined(mingw32_HOST_OS)
3950     case BlockedOnDoProc:
3951     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
3952     break;
3953 #endif
3954   case BlockedOnDelay:
3955     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
3956     break;
3957   case BlockedOnMVar:
3958     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
3959     break;
3960   case BlockedOnException:
3961     debugBelch("is blocked on delivering an exception to thread %d",
3962             tso->block_info.tso->id);
3963     break;
3964   case BlockedOnBlackHole:
3965     debugBelch("is blocked on a black hole");
3966     break;
3967   case NotBlocked:
3968     debugBelch("is not blocked");
3969     break;
3970 #if defined(PARALLEL_HASKELL)
3971   case BlockedOnGA:
3972     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3973             tso->block_info.closure, info_type(tso->block_info.closure));
3974     break;
3975   case BlockedOnGA_NoSend:
3976     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3977             tso->block_info.closure, info_type(tso->block_info.closure));
3978     break;
3979 #endif
3980   case BlockedOnCCall:
3981     debugBelch("is blocked on an external call");
3982     break;
3983   case BlockedOnCCall_NoUnblockExc:
3984     debugBelch("is blocked on an external call (exceptions were already blocked)");
3985     break;
3986   case BlockedOnSTM:
3987     debugBelch("is blocked on an STM operation");
3988     break;
3989   default:
3990     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3991          tso->why_blocked, tso->id, tso);
3992   }
3993 }
3994
3995 static void
3996 printThreadStatus(StgTSO *tso)
3997 {
3998   switch (tso->what_next) {
3999   case ThreadKilled:
4000     debugBelch("has been killed");
4001     break;
4002   case ThreadComplete:
4003     debugBelch("has completed");
4004     break;
4005   default:
4006     printThreadBlockage(tso);
4007   }
4008 }
4009
4010 void
4011 printAllThreads(void)
4012 {
4013   StgTSO *t;
4014
4015 # if defined(GRAN)
4016   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4017   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4018                        time_string, rtsFalse/*no commas!*/);
4019
4020   debugBelch("all threads at [%s]:\n", time_string);
4021 # elif defined(PARALLEL_HASKELL)
4022   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4023   ullong_format_string(CURRENT_TIME,
4024                        time_string, rtsFalse/*no commas!*/);
4025
4026   debugBelch("all threads at [%s]:\n", time_string);
4027 # else
4028   debugBelch("all threads:\n");
4029 # endif
4030
4031   for (t = all_threads; t != END_TSO_QUEUE; ) {
4032     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4033     {
4034       void *label = lookupThreadLabel(t->id);
4035       if (label) debugBelch("[\"%s\"] ",(char *)label);
4036     }
4037     if (t->what_next == ThreadRelocated) {
4038         debugBelch("has been relocated...\n");
4039         t = t->link;
4040     } else {
4041         printThreadStatus(t);
4042         debugBelch("\n");
4043         t = t->global_link;
4044     }
4045   }
4046 }
4047
4048 // useful from gdb
4049 void 
4050 printThreadQueue(StgTSO *t)
4051 {
4052     nat i = 0;
4053     for (; t != END_TSO_QUEUE; t = t->link) {
4054         debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4055         if (t->what_next == ThreadRelocated) {
4056             debugBelch("has been relocated...\n");
4057         } else {
4058             printThreadStatus(t);
4059             debugBelch("\n");
4060         }
4061         i++;
4062     }
4063     debugBelch("%d threads on queue\n", i);
4064 }
4065
4066 /* 
4067    Print a whole blocking queue attached to node (debugging only).
4068 */
4069 # if defined(PARALLEL_HASKELL)
4070 void 
4071 print_bq (StgClosure *node)
4072 {
4073   StgBlockingQueueElement *bqe;
4074   StgTSO *tso;
4075   rtsBool end;
4076
4077   debugBelch("## BQ of closure %p (%s): ",
4078           node, info_type(node));
4079
4080   /* should cover all closures that may have a blocking queue */
4081   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4082          get_itbl(node)->type == FETCH_ME_BQ ||
4083          get_itbl(node)->type == RBH ||
4084          get_itbl(node)->type == MVAR);
4085     
4086   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4087
4088   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4089 }
4090
4091 /* 
4092    Print a whole blocking queue starting with the element bqe.
4093 */
4094 void 
4095 print_bqe (StgBlockingQueueElement *bqe)
4096 {
4097   rtsBool end;
4098
4099   /* 
4100      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4101   */
4102   for (end = (bqe==END_BQ_QUEUE);
4103        !end; // iterate until bqe points to a CONSTR
4104        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4105        bqe = end ? END_BQ_QUEUE : bqe->link) {
4106     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4107     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4108     /* types of closures that may appear in a blocking queue */
4109     ASSERT(get_itbl(bqe)->type == TSO ||           
4110            get_itbl(bqe)->type == BLOCKED_FETCH || 
4111            get_itbl(bqe)->type == CONSTR); 
4112     /* only BQs of an RBH end with an RBH_Save closure */
4113     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4114
4115     switch (get_itbl(bqe)->type) {
4116     case TSO:
4117       debugBelch(" TSO %u (%x),",
4118               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4119       break;
4120     case BLOCKED_FETCH:
4121       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4122               ((StgBlockedFetch *)bqe)->node, 
4123               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4124               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4125               ((StgBlockedFetch *)bqe)->ga.weight);
4126       break;
4127     case CONSTR:
4128       debugBelch(" %s (IP %p),",
4129               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4130                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4131                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4132                "RBH_Save_?"), get_itbl(bqe));
4133       break;
4134     default:
4135       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4136            info_type((StgClosure *)bqe)); // , node, info_type(node));
4137       break;
4138     }
4139   } /* for */
4140   debugBelch("\n");
4141 }
4142 # elif defined(GRAN)
4143 void 
4144 print_bq (StgClosure *node)
4145 {
4146   StgBlockingQueueElement *bqe;
4147   PEs node_loc, tso_loc;
4148   rtsBool end;
4149
4150   /* should cover all closures that may have a blocking queue */
4151   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4152          get_itbl(node)->type == FETCH_ME_BQ ||
4153          get_itbl(node)->type == RBH);
4154     
4155   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4156   node_loc = where_is(node);
4157
4158   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4159           node, info_type(node), node_loc);
4160
4161   /* 
4162      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4163   */
4164   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4165        !end; // iterate until bqe points to a CONSTR
4166        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4167     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4168     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4169     /* types of closures that may appear in a blocking queue */
4170     ASSERT(get_itbl(bqe)->type == TSO ||           
4171            get_itbl(bqe)->type == CONSTR); 
4172     /* only BQs of an RBH end with an RBH_Save closure */
4173     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4174
4175     tso_loc = where_is((StgClosure *)bqe);
4176     switch (get_itbl(bqe)->type) {
4177     case TSO:
4178       debugBelch(" TSO %d (%p) on [PE %d],",
4179               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4180       break;
4181     case CONSTR:
4182       debugBelch(" %s (IP %p),",
4183               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4184                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4185                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4186                "RBH_Save_?"), get_itbl(bqe));
4187       break;
4188     default:
4189       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4190            info_type((StgClosure *)bqe), node, info_type(node));
4191       break;
4192     }
4193   } /* for */
4194   debugBelch("\n");
4195 }
4196 # endif
4197
4198 #if defined(PARALLEL_HASKELL)
4199 static nat
4200 run_queue_len(void)
4201 {
4202     nat i;
4203     StgTSO *tso;
4204     
4205     for (i=0, tso=run_queue_hd; 
4206          tso != END_TSO_QUEUE;
4207          i++, tso=tso->link) {
4208         /* nothing */
4209     }
4210         
4211     return i;
4212 }
4213 #endif
4214
4215 void
4216 sched_belch(char *s, ...)
4217 {
4218     va_list ap;
4219     va_start(ap,s);
4220 #ifdef THREADED_RTS
4221     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4222 #elif defined(PARALLEL_HASKELL)
4223     debugBelch("== ");
4224 #else
4225     debugBelch("sched: ");
4226 #endif
4227     vdebugBelch(s, ap);
4228     debugBelch("\n");
4229     va_end(ap);
4230 }
4231
4232 #endif /* DEBUG */