2bc7472a59bbb8a37e9baca598bc45803fcbf44a
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 #ifdef THREADED_RTS
73 #define USED_WHEN_THREADED_RTS
74 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
75 #else
76 #define USED_WHEN_THREADED_RTS     STG_UNUSED
77 #define USED_WHEN_NON_THREADED_RTS
78 #endif
79
80 #ifdef SMP
81 #define USED_WHEN_SMP
82 #else
83 #define USED_WHEN_SMP STG_UNUSED
84 #endif
85
86 /* -----------------------------------------------------------------------------
87  * Global variables
88  * -------------------------------------------------------------------------- */
89
90 #if defined(GRAN)
91
92 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
93 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
94
95 /* 
96    In GranSim we have a runnable and a blocked queue for each processor.
97    In order to minimise code changes new arrays run_queue_hds/tls
98    are created. run_queue_hd is then a short cut (macro) for
99    run_queue_hds[CurrentProc] (see GranSim.h).
100    -- HWL
101 */
102 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
103 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
104 StgTSO *ccalling_threadss[MAX_PROC];
105 /* We use the same global list of threads (all_threads) in GranSim as in
106    the std RTS (i.e. we are cheating). However, we don't use this list in
107    the GranSim specific code at the moment (so we are only potentially
108    cheating).  */
109
110 #else /* !GRAN */
111
112 #if !defined(THREADED_RTS)
113 // Blocked/sleeping thrads
114 StgTSO *blocked_queue_hd = NULL;
115 StgTSO *blocked_queue_tl = NULL;
116 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
117 #endif
118
119 /* Threads blocked on blackholes.
120  * LOCK: sched_mutex+capability, or all capabilities
121  */
122 StgTSO *blackhole_queue = NULL;
123 #endif
124
125 /* The blackhole_queue should be checked for threads to wake up.  See
126  * Schedule.h for more thorough comment.
127  * LOCK: none (doesn't matter if we miss an update)
128  */
129 rtsBool blackholes_need_checking = rtsFalse;
130
131 /* Linked list of all threads.
132  * Used for detecting garbage collected threads.
133  * LOCK: sched_mutex+capability, or all capabilities
134  */
135 StgTSO *all_threads = NULL;
136
137 /* flag set by signal handler to precipitate a context switch
138  * LOCK: none (just an advisory flag)
139  */
140 int context_switch = 0;
141
142 /* flag that tracks whether we have done any execution in this time slice.
143  * LOCK: currently none, perhaps we should lock (but needs to be
144  * updated in the fast path of the scheduler).
145  */
146 nat recent_activity = ACTIVITY_YES;
147
148 /* if this flag is set as well, give up execution
149  * LOCK: none (changes once, from false->true)
150  */
151 rtsBool interrupted = rtsFalse;
152
153 /* Next thread ID to allocate.
154  * LOCK: sched_mutex
155  */
156 static StgThreadID next_thread_id = 1;
157
158 /* The smallest stack size that makes any sense is:
159  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
160  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
161  *  + 1                       (the closure to enter)
162  *  + 1                       (stg_ap_v_ret)
163  *  + 1                       (spare slot req'd by stg_ap_v_ret)
164  *
165  * A thread with this stack will bomb immediately with a stack
166  * overflow, which will increase its stack size.  
167  */
168 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
169
170 #if defined(GRAN)
171 StgTSO *CurrentTSO;
172 #endif
173
174 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
175  *  exists - earlier gccs apparently didn't.
176  *  -= chak
177  */
178 StgTSO dummy_tso;
179
180 /*
181  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
182  * in an MT setting, needed to signal that a worker thread shouldn't hang around
183  * in the scheduler when it is out of work.
184  */
185 rtsBool shutting_down_scheduler = rtsFalse;
186
187 /*
188  * This mutex protects most of the global scheduler data in
189  * the THREADED_RTS and (inc. SMP) runtime.
190  */
191 #if defined(THREADED_RTS)
192 Mutex sched_mutex = INIT_MUTEX_VAR;
193 #endif
194
195 #if defined(PARALLEL_HASKELL)
196 StgTSO *LastTSO;
197 rtsTime TimeOfLastYield;
198 rtsBool emitSchedule = rtsTrue;
199 #endif
200
201 /* -----------------------------------------------------------------------------
202  * static function prototypes
203  * -------------------------------------------------------------------------- */
204
205 static Capability *schedule (Capability *initialCapability, Task *task);
206
207 //
208 // These function all encapsulate parts of the scheduler loop, and are
209 // abstracted only to make the structure and control flow of the
210 // scheduler clearer.
211 //
212 static void schedulePreLoop (void);
213 static void schedulePushWork(Capability *cap, Task *task);
214 static void scheduleStartSignalHandlers (void);
215 static void scheduleCheckBlockedThreads (Capability *cap);
216 static void scheduleCheckBlackHoles (Capability *cap);
217 static void scheduleDetectDeadlock (Capability *cap, Task *task);
218 #if defined(GRAN)
219 static StgTSO *scheduleProcessEvent(rtsEvent *event);
220 #endif
221 #if defined(PARALLEL_HASKELL)
222 static StgTSO *scheduleSendPendingMessages(void);
223 static void scheduleActivateSpark(void);
224 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
225 #endif
226 #if defined(PAR) || defined(GRAN)
227 static void scheduleGranParReport(void);
228 #endif
229 static void schedulePostRunThread(void);
230 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
231 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
232                                          StgTSO *t);
233 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
234                                     nat prev_what_next );
235 static void scheduleHandleThreadBlocked( StgTSO *t );
236 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
237                                              StgTSO *t );
238 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
239 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
240
241 static void unblockThread(Capability *cap, StgTSO *tso);
242 static rtsBool checkBlackHoles(Capability *cap);
243 static void AllRoots(evac_fn evac);
244
245 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
246
247 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
248                         rtsBool stop_at_atomically);
249
250 static void deleteThread (Capability *cap, StgTSO *tso);
251 static void deleteRunQueue (Capability *cap);
252
253 #ifdef DEBUG
254 static void printThreadBlockage(StgTSO *tso);
255 static void printThreadStatus(StgTSO *tso);
256 void printThreadQueue(StgTSO *tso);
257 #endif
258
259 #if defined(PARALLEL_HASKELL)
260 StgTSO * createSparkThread(rtsSpark spark);
261 StgTSO * activateSpark (rtsSpark spark);  
262 #endif
263
264 #ifdef DEBUG
265 static char *whatNext_strs[] = {
266   "(unknown)",
267   "ThreadRunGHC",
268   "ThreadInterpret",
269   "ThreadKilled",
270   "ThreadRelocated",
271   "ThreadComplete"
272 };
273 #endif
274
275 /* -----------------------------------------------------------------------------
276  * Putting a thread on the run queue: different scheduling policies
277  * -------------------------------------------------------------------------- */
278
279 STATIC_INLINE void
280 addToRunQueue( Capability *cap, StgTSO *t )
281 {
282 #if defined(PARALLEL_HASKELL)
283     if (RtsFlags.ParFlags.doFairScheduling) { 
284         // this does round-robin scheduling; good for concurrency
285         appendToRunQueue(cap,t);
286     } else {
287         // this does unfair scheduling; good for parallelism
288         pushOnRunQueue(cap,t);
289     }
290 #else
291     // this does round-robin scheduling; good for concurrency
292     appendToRunQueue(cap,t);
293 #endif
294 }
295
296 /* ---------------------------------------------------------------------------
297    Main scheduling loop.
298
299    We use round-robin scheduling, each thread returning to the
300    scheduler loop when one of these conditions is detected:
301
302       * out of heap space
303       * timer expires (thread yields)
304       * thread blocks
305       * thread ends
306       * stack overflow
307
308    GRAN version:
309      In a GranSim setup this loop iterates over the global event queue.
310      This revolves around the global event queue, which determines what 
311      to do next. Therefore, it's more complicated than either the 
312      concurrent or the parallel (GUM) setup.
313
314    GUM version:
315      GUM iterates over incoming messages.
316      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
317      and sends out a fish whenever it has nothing to do; in-between
318      doing the actual reductions (shared code below) it processes the
319      incoming messages and deals with delayed operations 
320      (see PendingFetches).
321      This is not the ugliest code you could imagine, but it's bloody close.
322
323    ------------------------------------------------------------------------ */
324
325 static Capability *
326 schedule (Capability *initialCapability, Task *task)
327 {
328   StgTSO *t;
329   Capability *cap;
330   StgThreadReturnCode ret;
331 #if defined(GRAN)
332   rtsEvent *event;
333 #elif defined(PARALLEL_HASKELL)
334   StgTSO *tso;
335   GlobalTaskId pe;
336   rtsBool receivedFinish = rtsFalse;
337 # if defined(DEBUG)
338   nat tp_size, sp_size; // stats only
339 # endif
340 #endif
341   nat prev_what_next;
342   rtsBool ready_to_gc;
343   rtsBool first = rtsTrue;
344   
345   cap = initialCapability;
346
347   // Pre-condition: this task owns initialCapability.
348   // The sched_mutex is *NOT* held
349   // NB. on return, we still hold a capability.
350
351   IF_DEBUG(scheduler,
352            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
353                        task, initialCapability);
354       );
355
356   schedulePreLoop();
357
358   // -----------------------------------------------------------
359   // Scheduler loop starts here:
360
361 #if defined(PARALLEL_HASKELL)
362 #define TERMINATION_CONDITION        (!receivedFinish)
363 #elif defined(GRAN)
364 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
365 #else
366 #define TERMINATION_CONDITION        rtsTrue
367 #endif
368
369   while (TERMINATION_CONDITION) {
370
371 #if defined(GRAN)
372       /* Choose the processor with the next event */
373       CurrentProc = event->proc;
374       CurrentTSO = event->tso;
375 #endif
376
377 #if defined(THREADED_RTS)
378       if (first) {
379           // don't yield the first time, we want a chance to run this
380           // thread for a bit, even if there are others banging at the
381           // door.
382           first = rtsFalse;
383           ASSERT_CAPABILITY_INVARIANTS(cap,task);
384       } else {
385           // Yield the capability to higher-priority tasks if necessary.
386           yieldCapability(&cap, task);
387       }
388 #endif
389       
390 #ifdef SMP
391       schedulePushWork(cap,task);
392 #endif          
393
394     // Check whether we have re-entered the RTS from Haskell without
395     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
396     // call).
397     if (cap->in_haskell) {
398           errorBelch("schedule: re-entered unsafely.\n"
399                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
400           stg_exit(EXIT_FAILURE);
401     }
402
403     //
404     // Test for interruption.  If interrupted==rtsTrue, then either
405     // we received a keyboard interrupt (^C), or the scheduler is
406     // trying to shut down all the tasks (shutting_down_scheduler) in
407     // the threaded RTS.
408     //
409     if (interrupted) {
410         deleteRunQueue(cap);
411         if (shutting_down_scheduler) {
412             IF_DEBUG(scheduler, sched_belch("shutting down"));
413             // If we are a worker, just exit.  If we're a bound thread
414             // then we will exit below when we've removed our TSO from
415             // the run queue.
416             if (task->tso == NULL) {
417                 return cap;
418             }
419         } else {
420             IF_DEBUG(scheduler, sched_belch("interrupted"));
421         }
422     }
423
424 #if defined(not_yet) && defined(SMP)
425     //
426     // Top up the run queue from our spark pool.  We try to make the
427     // number of threads in the run queue equal to the number of
428     // free capabilities.
429     //
430     {
431         StgClosure *spark;
432         if (emptyRunQueue()) {
433             spark = findSpark(rtsFalse);
434             if (spark == NULL) {
435                 break; /* no more sparks in the pool */
436             } else {
437                 createSparkThread(spark);         
438                 IF_DEBUG(scheduler,
439                          sched_belch("==^^ turning spark of closure %p into a thread",
440                                      (StgClosure *)spark));
441             }
442         }
443     }
444 #endif // SMP
445
446     scheduleStartSignalHandlers();
447
448     // Only check the black holes here if we've nothing else to do.
449     // During normal execution, the black hole list only gets checked
450     // at GC time, to avoid repeatedly traversing this possibly long
451     // list each time around the scheduler.
452     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
453
454     scheduleCheckBlockedThreads(cap);
455
456     scheduleDetectDeadlock(cap,task);
457
458     // Normally, the only way we can get here with no threads to
459     // run is if a keyboard interrupt received during 
460     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
461     // Additionally, it is not fatal for the
462     // threaded RTS to reach here with no threads to run.
463     //
464     // win32: might be here due to awaitEvent() being abandoned
465     // as a result of a console event having been delivered.
466     if ( emptyRunQueue(cap) ) {
467 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
468         ASSERT(interrupted);
469 #endif
470         continue; // nothing to do
471     }
472
473 #if defined(PARALLEL_HASKELL)
474     scheduleSendPendingMessages();
475     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
476         continue;
477
478 #if defined(SPARKS)
479     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
480 #endif
481
482     /* If we still have no work we need to send a FISH to get a spark
483        from another PE */
484     if (emptyRunQueue(cap)) {
485         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
486         ASSERT(rtsFalse); // should not happen at the moment
487     }
488     // from here: non-empty run queue.
489     //  TODO: merge above case with this, only one call processMessages() !
490     if (PacketsWaiting()) {  /* process incoming messages, if
491                                 any pending...  only in else
492                                 because getRemoteWork waits for
493                                 messages as well */
494         receivedFinish = processMessages();
495     }
496 #endif
497
498 #if defined(GRAN)
499     scheduleProcessEvent(event);
500 #endif
501
502     // 
503     // Get a thread to run
504     //
505     t = popRunQueue(cap);
506
507 #if defined(GRAN) || defined(PAR)
508     scheduleGranParReport(); // some kind of debuging output
509 #else
510     // Sanity check the thread we're about to run.  This can be
511     // expensive if there is lots of thread switching going on...
512     IF_DEBUG(sanity,checkTSO(t));
513 #endif
514
515 #if defined(THREADED_RTS)
516     // Check whether we can run this thread in the current task.
517     // If not, we have to pass our capability to the right task.
518     {
519         Task *bound = t->bound;
520       
521         if (bound) {
522             if (bound == task) {
523                 IF_DEBUG(scheduler,
524                          sched_belch("### Running thread %d in bound thread",
525                                      t->id));
526                 // yes, the Haskell thread is bound to the current native thread
527             } else {
528                 IF_DEBUG(scheduler,
529                          sched_belch("### thread %d bound to another OS thread",
530                                      t->id));
531                 // no, bound to a different Haskell thread: pass to that thread
532                 pushOnRunQueue(cap,t);
533                 continue;
534             }
535         } else {
536             // The thread we want to run is unbound.
537             if (task->tso) { 
538                 IF_DEBUG(scheduler,
539                          sched_belch("### this OS thread cannot run thread %d", t->id));
540                 // no, the current native thread is bound to a different
541                 // Haskell thread, so pass it to any worker thread
542                 pushOnRunQueue(cap,t);
543                 continue; 
544             }
545         }
546     }
547 #endif
548
549     cap->r.rCurrentTSO = t;
550     
551     /* context switches are initiated by the timer signal, unless
552      * the user specified "context switch as often as possible", with
553      * +RTS -C0
554      */
555     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
556         && !emptyThreadQueues(cap)) {
557         context_switch = 1;
558     }
559          
560 run_thread:
561
562     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
563                               (long)t->id, whatNext_strs[t->what_next]));
564
565 #if defined(PROFILING)
566     startHeapProfTimer();
567 #endif
568
569     // ----------------------------------------------------------------------
570     // Run the current thread 
571
572     prev_what_next = t->what_next;
573
574     errno = t->saved_errno;
575     cap->in_haskell = rtsTrue;
576
577     recent_activity = ACTIVITY_YES;
578
579     switch (prev_what_next) {
580         
581     case ThreadKilled:
582     case ThreadComplete:
583         /* Thread already finished, return to scheduler. */
584         ret = ThreadFinished;
585         break;
586         
587     case ThreadRunGHC:
588     {
589         StgRegTable *r;
590         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
591         cap = regTableToCapability(r);
592         ret = r->rRet;
593         break;
594     }
595     
596     case ThreadInterpret:
597         cap = interpretBCO(cap);
598         ret = cap->r.rRet;
599         break;
600         
601     default:
602         barf("schedule: invalid what_next field");
603     }
604
605     cap->in_haskell = rtsFalse;
606
607     // The TSO might have moved, eg. if it re-entered the RTS and a GC
608     // happened.  So find the new location:
609     t = cap->r.rCurrentTSO;
610
611 #ifdef SMP
612     // If ret is ThreadBlocked, and this Task is bound to the TSO that
613     // blocked, we are in limbo - the TSO is now owned by whatever it
614     // is blocked on, and may in fact already have been woken up,
615     // perhaps even on a different Capability.  It may be the case
616     // that task->cap != cap.  We better yield this Capability
617     // immediately and return to normaility.
618     if (ret == ThreadBlocked) {
619         IF_DEBUG(scheduler,
620                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
621                             t->id, whatNext_strs[t->what_next]));
622         continue;
623     }
624 #endif
625
626     ASSERT_CAPABILITY_INVARIANTS(cap,task);
627
628     // And save the current errno in this thread.
629     t->saved_errno = errno;
630
631     // ----------------------------------------------------------------------
632     
633     // Costs for the scheduler are assigned to CCS_SYSTEM
634 #if defined(PROFILING)
635     stopHeapProfTimer();
636     CCCS = CCS_SYSTEM;
637 #endif
638     
639     // We have run some Haskell code: there might be blackhole-blocked
640     // threads to wake up now.
641     // Lock-free test here should be ok, we're just setting a flag.
642     if ( blackhole_queue != END_TSO_QUEUE ) {
643         blackholes_need_checking = rtsTrue;
644     }
645     
646 #if defined(THREADED_RTS)
647     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
648 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
649     IF_DEBUG(scheduler,debugBelch("sched: "););
650 #endif
651     
652     schedulePostRunThread();
653
654     ready_to_gc = rtsFalse;
655
656     switch (ret) {
657     case HeapOverflow:
658         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
659         break;
660
661     case StackOverflow:
662         scheduleHandleStackOverflow(cap,task,t);
663         break;
664
665     case ThreadYielding:
666         if (scheduleHandleYield(cap, t, prev_what_next)) {
667             // shortcut for switching between compiler/interpreter:
668             goto run_thread; 
669         }
670         break;
671
672     case ThreadBlocked:
673         scheduleHandleThreadBlocked(t);
674         break;
675
676     case ThreadFinished:
677         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
678         ASSERT_CAPABILITY_INVARIANTS(cap,task);
679         break;
680
681     default:
682       barf("schedule: invalid thread return code %d", (int)ret);
683     }
684
685     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
686     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
687   } /* end of while() */
688
689   IF_PAR_DEBUG(verbose,
690                debugBelch("== Leaving schedule() after having received Finish\n"));
691 }
692
693 /* ----------------------------------------------------------------------------
694  * Setting up the scheduler loop
695  * ------------------------------------------------------------------------- */
696
697 static void
698 schedulePreLoop(void)
699 {
700 #if defined(GRAN) 
701     /* set up first event to get things going */
702     /* ToDo: assign costs for system setup and init MainTSO ! */
703     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
704               ContinueThread, 
705               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
706     
707     IF_DEBUG(gran,
708              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
709                         CurrentTSO);
710              G_TSO(CurrentTSO, 5));
711     
712     if (RtsFlags.GranFlags.Light) {
713         /* Save current time; GranSim Light only */
714         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
715     }      
716 #endif
717 }
718
719 /* -----------------------------------------------------------------------------
720  * schedulePushWork()
721  *
722  * Push work to other Capabilities if we have some.
723  * -------------------------------------------------------------------------- */
724
725 static void
726 schedulePushWork(Capability *cap, Task *task)
727 {
728 #ifdef SMP
729     Capability *free_caps[n_capabilities], *cap0;
730     nat i, n_free_caps;
731
732     // Check whether we have more threads on our run queue that we
733     // could hand to another Capability.
734     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
735         return;
736     }
737
738     // First grab as many free Capabilities as we can.
739     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
740         cap0 = &capabilities[i];
741         if (cap != cap0 && tryGrabCapability(cap0,task)) {
742             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
743                 // it already has some work, we just grabbed it at 
744                 // the wrong moment.  Or maybe it's deadlocked!
745                 releaseCapability(cap0);
746             } else {
747                 free_caps[n_free_caps++] = cap0;
748             }
749         }
750     }
751
752     // we now have n_free_caps free capabilities stashed in
753     // free_caps[].  Share our run queue equally with them.  This is
754     // probably the simplest thing we could do; improvements we might
755     // want to do include:
756     //
757     //   - giving high priority to moving relatively new threads, on 
758     //     the gournds that they haven't had time to build up a
759     //     working set in the cache on this CPU/Capability.
760     //
761     //   - giving low priority to moving long-lived threads
762
763     if (n_free_caps > 0) {
764         StgTSO *prev, *t, *next;
765         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
766
767         prev = cap->run_queue_hd;
768         t = prev->link;
769         prev->link = END_TSO_QUEUE;
770         i = 0;
771         for (; t != END_TSO_QUEUE; t = next) {
772             next = t->link;
773             t->link = END_TSO_QUEUE;
774             if (t->what_next == ThreadRelocated) {
775                 prev->link = t;
776                 prev = t;
777             } else if (i == n_free_caps) {
778                 i = 0;
779                 // keep one for us
780                 prev->link = t;
781                 prev = t;
782             } else {
783                 appendToRunQueue(free_caps[i],t);
784                 if (t->bound) { t->bound->cap = free_caps[i]; }
785                 i++;
786             }
787         }
788         cap->run_queue_tl = prev;
789
790         // release the capabilities
791         for (i = 0; i < n_free_caps; i++) {
792             task->cap = free_caps[i];
793             releaseCapability(free_caps[i]);
794         }
795     }
796     task->cap = cap; // reset to point to our Capability.
797 #endif
798 }
799
800 /* ----------------------------------------------------------------------------
801  * Start any pending signal handlers
802  * ------------------------------------------------------------------------- */
803
804 static void
805 scheduleStartSignalHandlers(void)
806 {
807 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
808     if (signals_pending()) { // safe outside the lock
809         startSignalHandlers();
810     }
811 #endif
812 }
813
814 /* ----------------------------------------------------------------------------
815  * Check for blocked threads that can be woken up.
816  * ------------------------------------------------------------------------- */
817
818 static void
819 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
820 {
821 #if !defined(THREADED_RTS)
822     //
823     // Check whether any waiting threads need to be woken up.  If the
824     // run queue is empty, and there are no other tasks running, we
825     // can wait indefinitely for something to happen.
826     //
827     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
828     {
829         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
830     }
831 #endif
832 }
833
834
835 /* ----------------------------------------------------------------------------
836  * Check for threads blocked on BLACKHOLEs that can be woken up
837  * ------------------------------------------------------------------------- */
838 static void
839 scheduleCheckBlackHoles (Capability *cap)
840 {
841     if ( blackholes_need_checking ) // check without the lock first
842     {
843         ACQUIRE_LOCK(&sched_mutex);
844         if ( blackholes_need_checking ) {
845             checkBlackHoles(cap);
846             blackholes_need_checking = rtsFalse;
847         }
848         RELEASE_LOCK(&sched_mutex);
849     }
850 }
851
852 /* ----------------------------------------------------------------------------
853  * Detect deadlock conditions and attempt to resolve them.
854  * ------------------------------------------------------------------------- */
855
856 static void
857 scheduleDetectDeadlock (Capability *cap, Task *task)
858 {
859
860 #if defined(PARALLEL_HASKELL)
861     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
862     return;
863 #endif
864
865     /* 
866      * Detect deadlock: when we have no threads to run, there are no
867      * threads blocked, waiting for I/O, or sleeping, and all the
868      * other tasks are waiting for work, we must have a deadlock of
869      * some description.
870      */
871     if ( emptyThreadQueues(cap) )
872     {
873 #if defined(THREADED_RTS)
874         /* 
875          * In the threaded RTS, we only check for deadlock if there
876          * has been no activity in a complete timeslice.  This means
877          * we won't eagerly start a full GC just because we don't have
878          * any threads to run currently.
879          */
880         if (recent_activity != ACTIVITY_INACTIVE) return;
881 #endif
882
883         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
884
885         // Garbage collection can release some new threads due to
886         // either (a) finalizers or (b) threads resurrected because
887         // they are unreachable and will therefore be sent an
888         // exception.  Any threads thus released will be immediately
889         // runnable.
890         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
891         recent_activity = ACTIVITY_DONE_GC;
892         
893         if ( !emptyRunQueue(cap) ) return;
894
895 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
896         /* If we have user-installed signal handlers, then wait
897          * for signals to arrive rather then bombing out with a
898          * deadlock.
899          */
900         if ( anyUserHandlers() ) {
901             IF_DEBUG(scheduler, 
902                      sched_belch("still deadlocked, waiting for signals..."));
903
904             awaitUserSignals();
905
906             if (signals_pending()) {
907                 startSignalHandlers();
908             }
909
910             // either we have threads to run, or we were interrupted:
911             ASSERT(!emptyRunQueue(cap) || interrupted);
912         }
913 #endif
914
915 #if !defined(THREADED_RTS)
916         /* Probably a real deadlock.  Send the current main thread the
917          * Deadlock exception.
918          */
919         if (task->tso) {
920             switch (task->tso->why_blocked) {
921             case BlockedOnSTM:
922             case BlockedOnBlackHole:
923             case BlockedOnException:
924             case BlockedOnMVar:
925                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
926                 return;
927             default:
928                 barf("deadlock: main thread blocked in a strange way");
929             }
930         }
931         return;
932 #endif
933     }
934 }
935
936 /* ----------------------------------------------------------------------------
937  * Process an event (GRAN only)
938  * ------------------------------------------------------------------------- */
939
940 #if defined(GRAN)
941 static StgTSO *
942 scheduleProcessEvent(rtsEvent *event)
943 {
944     StgTSO *t;
945
946     if (RtsFlags.GranFlags.Light)
947       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
948
949     /* adjust time based on time-stamp */
950     if (event->time > CurrentTime[CurrentProc] &&
951         event->evttype != ContinueThread)
952       CurrentTime[CurrentProc] = event->time;
953     
954     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
955     if (!RtsFlags.GranFlags.Light)
956       handleIdlePEs();
957
958     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
959
960     /* main event dispatcher in GranSim */
961     switch (event->evttype) {
962       /* Should just be continuing execution */
963     case ContinueThread:
964       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
965       /* ToDo: check assertion
966       ASSERT(run_queue_hd != (StgTSO*)NULL &&
967              run_queue_hd != END_TSO_QUEUE);
968       */
969       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
970       if (!RtsFlags.GranFlags.DoAsyncFetch &&
971           procStatus[CurrentProc]==Fetching) {
972         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
973               CurrentTSO->id, CurrentTSO, CurrentProc);
974         goto next_thread;
975       } 
976       /* Ignore ContinueThreads for completed threads */
977       if (CurrentTSO->what_next == ThreadComplete) {
978         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
979               CurrentTSO->id, CurrentTSO, CurrentProc);
980         goto next_thread;
981       } 
982       /* Ignore ContinueThreads for threads that are being migrated */
983       if (PROCS(CurrentTSO)==Nowhere) { 
984         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
985               CurrentTSO->id, CurrentTSO, CurrentProc);
986         goto next_thread;
987       }
988       /* The thread should be at the beginning of the run queue */
989       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
990         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
991               CurrentTSO->id, CurrentTSO, CurrentProc);
992         break; // run the thread anyway
993       }
994       /*
995       new_event(proc, proc, CurrentTime[proc],
996                 FindWork,
997                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
998       goto next_thread; 
999       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1000       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1001
1002     case FetchNode:
1003       do_the_fetchnode(event);
1004       goto next_thread;             /* handle next event in event queue  */
1005       
1006     case GlobalBlock:
1007       do_the_globalblock(event);
1008       goto next_thread;             /* handle next event in event queue  */
1009       
1010     case FetchReply:
1011       do_the_fetchreply(event);
1012       goto next_thread;             /* handle next event in event queue  */
1013       
1014     case UnblockThread:   /* Move from the blocked queue to the tail of */
1015       do_the_unblock(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case ResumeThread:  /* Move from the blocked queue to the tail of */
1019       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1020       event->tso->gran.blocktime += 
1021         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1022       do_the_startthread(event);
1023       goto next_thread;             /* handle next event in event queue  */
1024       
1025     case StartThread:
1026       do_the_startthread(event);
1027       goto next_thread;             /* handle next event in event queue  */
1028       
1029     case MoveThread:
1030       do_the_movethread(event);
1031       goto next_thread;             /* handle next event in event queue  */
1032       
1033     case MoveSpark:
1034       do_the_movespark(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case FindWork:
1038       do_the_findwork(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     default:
1042       barf("Illegal event type %u\n", event->evttype);
1043     }  /* switch */
1044     
1045     /* This point was scheduler_loop in the old RTS */
1046
1047     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1048
1049     TimeOfLastEvent = CurrentTime[CurrentProc];
1050     TimeOfNextEvent = get_time_of_next_event();
1051     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1052     // CurrentTSO = ThreadQueueHd;
1053
1054     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1055                          TimeOfNextEvent));
1056
1057     if (RtsFlags.GranFlags.Light) 
1058       GranSimLight_leave_system(event, &ActiveTSO); 
1059
1060     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1061
1062     IF_DEBUG(gran, 
1063              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1064
1065     /* in a GranSim setup the TSO stays on the run queue */
1066     t = CurrentTSO;
1067     /* Take a thread from the run queue. */
1068     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1069
1070     IF_DEBUG(gran, 
1071              debugBelch("GRAN: About to run current thread, which is\n");
1072              G_TSO(t,5));
1073
1074     context_switch = 0; // turned on via GranYield, checking events and time slice
1075
1076     IF_DEBUG(gran, 
1077              DumpGranEvent(GR_SCHEDULE, t));
1078
1079     procStatus[CurrentProc] = Busy;
1080 }
1081 #endif // GRAN
1082
1083 /* ----------------------------------------------------------------------------
1084  * Send pending messages (PARALLEL_HASKELL only)
1085  * ------------------------------------------------------------------------- */
1086
1087 #if defined(PARALLEL_HASKELL)
1088 static StgTSO *
1089 scheduleSendPendingMessages(void)
1090 {
1091     StgSparkPool *pool;
1092     rtsSpark spark;
1093     StgTSO *t;
1094
1095 # if defined(PAR) // global Mem.Mgmt., omit for now
1096     if (PendingFetches != END_BF_QUEUE) {
1097         processFetches();
1098     }
1099 # endif
1100     
1101     if (RtsFlags.ParFlags.BufferTime) {
1102         // if we use message buffering, we must send away all message
1103         // packets which have become too old...
1104         sendOldBuffers(); 
1105     }
1106 }
1107 #endif
1108
1109 /* ----------------------------------------------------------------------------
1110  * Activate spark threads (PARALLEL_HASKELL only)
1111  * ------------------------------------------------------------------------- */
1112
1113 #if defined(PARALLEL_HASKELL)
1114 static void
1115 scheduleActivateSpark(void)
1116 {
1117 #if defined(SPARKS)
1118   ASSERT(emptyRunQueue());
1119 /* We get here if the run queue is empty and want some work.
1120    We try to turn a spark into a thread, and add it to the run queue,
1121    from where it will be picked up in the next iteration of the scheduler
1122    loop.
1123 */
1124
1125       /* :-[  no local threads => look out for local sparks */
1126       /* the spark pool for the current PE */
1127       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1128       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1129           pool->hd < pool->tl) {
1130         /* 
1131          * ToDo: add GC code check that we really have enough heap afterwards!!
1132          * Old comment:
1133          * If we're here (no runnable threads) and we have pending
1134          * sparks, we must have a space problem.  Get enough space
1135          * to turn one of those pending sparks into a
1136          * thread... 
1137          */
1138
1139         spark = findSpark(rtsFalse);            /* get a spark */
1140         if (spark != (rtsSpark) NULL) {
1141           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1142           IF_PAR_DEBUG(fish, // schedule,
1143                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1144                              tso->id, tso, advisory_thread_count));
1145
1146           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1147             IF_PAR_DEBUG(fish, // schedule,
1148                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1149                             spark));
1150             return rtsFalse; /* failed to generate a thread */
1151           }                  /* otherwise fall through & pick-up new tso */
1152         } else {
1153           IF_PAR_DEBUG(fish, // schedule,
1154                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1155                              spark_queue_len(pool)));
1156           return rtsFalse;  /* failed to generate a thread */
1157         }
1158         return rtsTrue;  /* success in generating a thread */
1159   } else { /* no more threads permitted or pool empty */
1160     return rtsFalse;  /* failed to generateThread */
1161   }
1162 #else
1163   tso = NULL; // avoid compiler warning only
1164   return rtsFalse;  /* dummy in non-PAR setup */
1165 #endif // SPARKS
1166 }
1167 #endif // PARALLEL_HASKELL
1168
1169 /* ----------------------------------------------------------------------------
1170  * Get work from a remote node (PARALLEL_HASKELL only)
1171  * ------------------------------------------------------------------------- */
1172     
1173 #if defined(PARALLEL_HASKELL)
1174 static rtsBool
1175 scheduleGetRemoteWork(rtsBool *receivedFinish)
1176 {
1177   ASSERT(emptyRunQueue());
1178
1179   if (RtsFlags.ParFlags.BufferTime) {
1180         IF_PAR_DEBUG(verbose, 
1181                 debugBelch("...send all pending data,"));
1182         {
1183           nat i;
1184           for (i=1; i<=nPEs; i++)
1185             sendImmediately(i); // send all messages away immediately
1186         }
1187   }
1188 # ifndef SPARKS
1189         //++EDEN++ idle() , i.e. send all buffers, wait for work
1190         // suppress fishing in EDEN... just look for incoming messages
1191         // (blocking receive)
1192   IF_PAR_DEBUG(verbose, 
1193                debugBelch("...wait for incoming messages...\n"));
1194   *receivedFinish = processMessages(); // blocking receive...
1195
1196         // and reenter scheduling loop after having received something
1197         // (return rtsFalse below)
1198
1199 # else /* activate SPARKS machinery */
1200 /* We get here, if we have no work, tried to activate a local spark, but still
1201    have no work. We try to get a remote spark, by sending a FISH message.
1202    Thread migration should be added here, and triggered when a sequence of 
1203    fishes returns without work. */
1204         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1205
1206       /* =8-[  no local sparks => look for work on other PEs */
1207         /*
1208          * We really have absolutely no work.  Send out a fish
1209          * (there may be some out there already), and wait for
1210          * something to arrive.  We clearly can't run any threads
1211          * until a SCHEDULE or RESUME arrives, and so that's what
1212          * we're hoping to see.  (Of course, we still have to
1213          * respond to other types of messages.)
1214          */
1215         rtsTime now = msTime() /*CURRENT_TIME*/;
1216         IF_PAR_DEBUG(verbose, 
1217                      debugBelch("--  now=%ld\n", now));
1218         IF_PAR_DEBUG(fish, // verbose,
1219              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1220                  (last_fish_arrived_at!=0 &&
1221                   last_fish_arrived_at+delay > now)) {
1222                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1223                      now, last_fish_arrived_at+delay, 
1224                      last_fish_arrived_at,
1225                      delay);
1226              });
1227   
1228         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1230           if (last_fish_arrived_at==0 ||
1231               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1232             /* outstandingFishes is set in sendFish, processFish;
1233                avoid flooding system with fishes via delay */
1234     next_fish_to_send_at = 0;  
1235   } else {
1236     /* ToDo: this should be done in the main scheduling loop to avoid the
1237              busy wait here; not so bad if fish delay is very small  */
1238     int iq = 0; // DEBUGGING -- HWL
1239     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1240     /* send a fish when ready, but process messages that arrive in the meantime */
1241     do {
1242       if (PacketsWaiting()) {
1243         iq++; // DEBUGGING
1244         *receivedFinish = processMessages();
1245       }
1246       now = msTime();
1247     } while (!*receivedFinish || now<next_fish_to_send_at);
1248     // JB: This means the fish could become obsolete, if we receive
1249     // work. Better check for work again? 
1250     // last line: while (!receivedFinish || !haveWork || now<...)
1251     // next line: if (receivedFinish || haveWork )
1252
1253     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1254       return rtsFalse;  // NB: this will leave scheduler loop
1255                         // immediately after return!
1256                           
1257     IF_PAR_DEBUG(fish, // verbose,
1258                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1259
1260   }
1261
1262     // JB: IMHO, this should all be hidden inside sendFish(...)
1263     /* pe = choosePE(); 
1264        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1265                 NEW_FISH_HUNGER);
1266
1267     // Global statistics: count no. of fishes
1268     if (RtsFlags.ParFlags.ParStats.Global &&
1269          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1270            globalParStats.tot_fish_mess++;
1271            }
1272     */ 
1273
1274   /* delayed fishes must have been sent by now! */
1275   next_fish_to_send_at = 0;  
1276   }
1277       
1278   *receivedFinish = processMessages();
1279 # endif /* SPARKS */
1280
1281  return rtsFalse;
1282  /* NB: this function always returns rtsFalse, meaning the scheduler
1283     loop continues with the next iteration; 
1284     rationale: 
1285       return code means success in finding work; we enter this function
1286       if there is no local work, thus have to send a fish which takes
1287       time until it arrives with work; in the meantime we should process
1288       messages in the main loop;
1289  */
1290 }
1291 #endif // PARALLEL_HASKELL
1292
1293 /* ----------------------------------------------------------------------------
1294  * PAR/GRAN: Report stats & debugging info(?)
1295  * ------------------------------------------------------------------------- */
1296
1297 #if defined(PAR) || defined(GRAN)
1298 static void
1299 scheduleGranParReport(void)
1300 {
1301   ASSERT(run_queue_hd != END_TSO_QUEUE);
1302
1303   /* Take a thread from the run queue, if we have work */
1304   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1305
1306     /* If this TSO has got its outport closed in the meantime, 
1307      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1308      * It has to be marked as TH_DEAD for this purpose.
1309      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1310
1311 JB: TODO: investigate wether state change field could be nuked
1312      entirely and replaced by the normal tso state (whatnext
1313      field). All we want to do is to kill tsos from outside.
1314      */
1315
1316     /* ToDo: write something to the log-file
1317     if (RTSflags.ParFlags.granSimStats && !sameThread)
1318         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1319
1320     CurrentTSO = t;
1321     */
1322     /* the spark pool for the current PE */
1323     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1324
1325     IF_DEBUG(scheduler, 
1326              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1327                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1328
1329     IF_PAR_DEBUG(fish,
1330              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1331                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1332
1333     if (RtsFlags.ParFlags.ParStats.Full && 
1334         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1335         (emitSchedule || // forced emit
1336          (t && LastTSO && t->id != LastTSO->id))) {
1337       /* 
1338          we are running a different TSO, so write a schedule event to log file
1339          NB: If we use fair scheduling we also have to write  a deschedule 
1340              event for LastTSO; with unfair scheduling we know that the
1341              previous tso has blocked whenever we switch to another tso, so
1342              we don't need it in GUM for now
1343       */
1344       IF_PAR_DEBUG(fish, // schedule,
1345                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1346
1347       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1348                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1349       emitSchedule = rtsFalse;
1350     }
1351 }     
1352 #endif
1353
1354 /* ----------------------------------------------------------------------------
1355  * After running a thread...
1356  * ------------------------------------------------------------------------- */
1357
1358 static void
1359 schedulePostRunThread(void)
1360 {
1361 #if defined(PAR)
1362     /* HACK 675: if the last thread didn't yield, make sure to print a 
1363        SCHEDULE event to the log file when StgRunning the next thread, even
1364        if it is the same one as before */
1365     LastTSO = t; 
1366     TimeOfLastYield = CURRENT_TIME;
1367 #endif
1368
1369   /* some statistics gathering in the parallel case */
1370
1371 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1372   switch (ret) {
1373     case HeapOverflow:
1374 # if defined(GRAN)
1375       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1376       globalGranStats.tot_heapover++;
1377 # elif defined(PAR)
1378       globalParStats.tot_heapover++;
1379 # endif
1380       break;
1381
1382      case StackOverflow:
1383 # if defined(GRAN)
1384       IF_DEBUG(gran, 
1385                DumpGranEvent(GR_DESCHEDULE, t));
1386       globalGranStats.tot_stackover++;
1387 # elif defined(PAR)
1388       // IF_DEBUG(par, 
1389       // DumpGranEvent(GR_DESCHEDULE, t);
1390       globalParStats.tot_stackover++;
1391 # endif
1392       break;
1393
1394     case ThreadYielding:
1395 # if defined(GRAN)
1396       IF_DEBUG(gran, 
1397                DumpGranEvent(GR_DESCHEDULE, t));
1398       globalGranStats.tot_yields++;
1399 # elif defined(PAR)
1400       // IF_DEBUG(par, 
1401       // DumpGranEvent(GR_DESCHEDULE, t);
1402       globalParStats.tot_yields++;
1403 # endif
1404       break; 
1405
1406     case ThreadBlocked:
1407 # if defined(GRAN)
1408       IF_DEBUG(scheduler,
1409                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1410                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1411                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1412                if (t->block_info.closure!=(StgClosure*)NULL)
1413                  print_bq(t->block_info.closure);
1414                debugBelch("\n"));
1415
1416       // ??? needed; should emit block before
1417       IF_DEBUG(gran, 
1418                DumpGranEvent(GR_DESCHEDULE, t)); 
1419       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1420       /*
1421         ngoq Dogh!
1422       ASSERT(procStatus[CurrentProc]==Busy || 
1423               ((procStatus[CurrentProc]==Fetching) && 
1424               (t->block_info.closure!=(StgClosure*)NULL)));
1425       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1426           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1427             procStatus[CurrentProc]==Fetching)) 
1428         procStatus[CurrentProc] = Idle;
1429       */
1430 # elif defined(PAR)
1431 //++PAR++  blockThread() writes the event (change?)
1432 # endif
1433     break;
1434
1435   case ThreadFinished:
1436     break;
1437
1438   default:
1439     barf("parGlobalStats: unknown return code");
1440     break;
1441     }
1442 #endif
1443 }
1444
1445 /* -----------------------------------------------------------------------------
1446  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1447  * -------------------------------------------------------------------------- */
1448
1449 static rtsBool
1450 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1451 {
1452     // did the task ask for a large block?
1453     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1454         // if so, get one and push it on the front of the nursery.
1455         bdescr *bd;
1456         lnat blocks;
1457         
1458         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1459         
1460         IF_DEBUG(scheduler,
1461                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1462                             (long)t->id, whatNext_strs[t->what_next], blocks));
1463         
1464         // don't do this if the nursery is (nearly) full, we'll GC first.
1465         if (cap->r.rCurrentNursery->link != NULL ||
1466             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1467                                                // if the nursery has only one block.
1468             
1469             ACQUIRE_SM_LOCK
1470             bd = allocGroup( blocks );
1471             RELEASE_SM_LOCK
1472             cap->r.rNursery->n_blocks += blocks;
1473             
1474             // link the new group into the list
1475             bd->link = cap->r.rCurrentNursery;
1476             bd->u.back = cap->r.rCurrentNursery->u.back;
1477             if (cap->r.rCurrentNursery->u.back != NULL) {
1478                 cap->r.rCurrentNursery->u.back->link = bd;
1479             } else {
1480 #if !defined(SMP)
1481                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1482                        g0s0 == cap->r.rNursery);
1483 #endif
1484                 cap->r.rNursery->blocks = bd;
1485             }             
1486             cap->r.rCurrentNursery->u.back = bd;
1487             
1488             // initialise it as a nursery block.  We initialise the
1489             // step, gen_no, and flags field of *every* sub-block in
1490             // this large block, because this is easier than making
1491             // sure that we always find the block head of a large
1492             // block whenever we call Bdescr() (eg. evacuate() and
1493             // isAlive() in the GC would both have to do this, at
1494             // least).
1495             { 
1496                 bdescr *x;
1497                 for (x = bd; x < bd + blocks; x++) {
1498                     x->step = cap->r.rNursery;
1499                     x->gen_no = 0;
1500                     x->flags = 0;
1501                 }
1502             }
1503             
1504             // This assert can be a killer if the app is doing lots
1505             // of large block allocations.
1506             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1507             
1508             // now update the nursery to point to the new block
1509             cap->r.rCurrentNursery = bd;
1510             
1511             // we might be unlucky and have another thread get on the
1512             // run queue before us and steal the large block, but in that
1513             // case the thread will just end up requesting another large
1514             // block.
1515             pushOnRunQueue(cap,t);
1516             return rtsFalse;  /* not actually GC'ing */
1517         }
1518     }
1519     
1520     IF_DEBUG(scheduler,
1521              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1522                         (long)t->id, whatNext_strs[t->what_next]));
1523 #if defined(GRAN)
1524     ASSERT(!is_on_queue(t,CurrentProc));
1525 #elif defined(PARALLEL_HASKELL)
1526     /* Currently we emit a DESCHEDULE event before GC in GUM.
1527        ToDo: either add separate event to distinguish SYSTEM time from rest
1528        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1529     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1530         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1531                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1532         emitSchedule = rtsTrue;
1533     }
1534 #endif
1535       
1536     pushOnRunQueue(cap,t);
1537     return rtsTrue;
1538     /* actual GC is done at the end of the while loop in schedule() */
1539 }
1540
1541 /* -----------------------------------------------------------------------------
1542  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1543  * -------------------------------------------------------------------------- */
1544
1545 static void
1546 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1547 {
1548     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1549                                   (long)t->id, whatNext_strs[t->what_next]));
1550     /* just adjust the stack for this thread, then pop it back
1551      * on the run queue.
1552      */
1553     { 
1554         /* enlarge the stack */
1555         StgTSO *new_t = threadStackOverflow(cap, t);
1556         
1557         /* This TSO has moved, so update any pointers to it from the
1558          * main thread stack.  It better not be on any other queues...
1559          * (it shouldn't be).
1560          */
1561         if (task->tso != NULL) {
1562             task->tso = new_t;
1563         }
1564         pushOnRunQueue(cap,new_t);
1565     }
1566 }
1567
1568 /* -----------------------------------------------------------------------------
1569  * Handle a thread that returned to the scheduler with ThreadYielding
1570  * -------------------------------------------------------------------------- */
1571
1572 static rtsBool
1573 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1574 {
1575     // Reset the context switch flag.  We don't do this just before
1576     // running the thread, because that would mean we would lose ticks
1577     // during GC, which can lead to unfair scheduling (a thread hogs
1578     // the CPU because the tick always arrives during GC).  This way
1579     // penalises threads that do a lot of allocation, but that seems
1580     // better than the alternative.
1581     context_switch = 0;
1582     
1583     /* put the thread back on the run queue.  Then, if we're ready to
1584      * GC, check whether this is the last task to stop.  If so, wake
1585      * up the GC thread.  getThread will block during a GC until the
1586      * GC is finished.
1587      */
1588     IF_DEBUG(scheduler,
1589              if (t->what_next != prev_what_next) {
1590                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1591                             (long)t->id, whatNext_strs[t->what_next]);
1592              } else {
1593                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1594                             (long)t->id, whatNext_strs[t->what_next]);
1595              }
1596         );
1597     
1598     IF_DEBUG(sanity,
1599              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1600              checkTSO(t));
1601     ASSERT(t->link == END_TSO_QUEUE);
1602     
1603     // Shortcut if we're just switching evaluators: don't bother
1604     // doing stack squeezing (which can be expensive), just run the
1605     // thread.
1606     if (t->what_next != prev_what_next) {
1607         return rtsTrue;
1608     }
1609     
1610 #if defined(GRAN)
1611     ASSERT(!is_on_queue(t,CurrentProc));
1612       
1613     IF_DEBUG(sanity,
1614              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1615              checkThreadQsSanity(rtsTrue));
1616
1617 #endif
1618
1619     addToRunQueue(cap,t);
1620
1621 #if defined(GRAN)
1622     /* add a ContinueThread event to actually process the thread */
1623     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1624               ContinueThread,
1625               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1626     IF_GRAN_DEBUG(bq, 
1627                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1628                   G_EVENTQ(0);
1629                   G_CURR_THREADQ(0));
1630 #endif
1631     return rtsFalse;
1632 }
1633
1634 /* -----------------------------------------------------------------------------
1635  * Handle a thread that returned to the scheduler with ThreadBlocked
1636  * -------------------------------------------------------------------------- */
1637
1638 static void
1639 scheduleHandleThreadBlocked( StgTSO *t
1640 #if !defined(GRAN) && !defined(DEBUG)
1641     STG_UNUSED
1642 #endif
1643     )
1644 {
1645 #if defined(GRAN)
1646     IF_DEBUG(scheduler,
1647              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1648                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1649              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1650     
1651     // ??? needed; should emit block before
1652     IF_DEBUG(gran, 
1653              DumpGranEvent(GR_DESCHEDULE, t)); 
1654     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1655     /*
1656       ngoq Dogh!
1657       ASSERT(procStatus[CurrentProc]==Busy || 
1658       ((procStatus[CurrentProc]==Fetching) && 
1659       (t->block_info.closure!=(StgClosure*)NULL)));
1660       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1661       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1662       procStatus[CurrentProc]==Fetching)) 
1663       procStatus[CurrentProc] = Idle;
1664     */
1665 #elif defined(PAR)
1666     IF_DEBUG(scheduler,
1667              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1668                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1669     IF_PAR_DEBUG(bq,
1670                  
1671                  if (t->block_info.closure!=(StgClosure*)NULL) 
1672                  print_bq(t->block_info.closure));
1673     
1674     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1675     blockThread(t);
1676     
1677     /* whatever we schedule next, we must log that schedule */
1678     emitSchedule = rtsTrue;
1679     
1680 #else /* !GRAN */
1681
1682       // We don't need to do anything.  The thread is blocked, and it
1683       // has tidied up its stack and placed itself on whatever queue
1684       // it needs to be on.
1685
1686 #if !defined(SMP)
1687     ASSERT(t->why_blocked != NotBlocked);
1688              // This might not be true under SMP: we don't have
1689              // exclusive access to this TSO, so someone might have
1690              // woken it up by now.  This actually happens: try
1691              // conc023 +RTS -N2.
1692 #endif
1693
1694     IF_DEBUG(scheduler,
1695              debugBelch("--<< thread %d (%s) stopped: ", 
1696                         t->id, whatNext_strs[t->what_next]);
1697              printThreadBlockage(t);
1698              debugBelch("\n"));
1699     
1700     /* Only for dumping event to log file 
1701        ToDo: do I need this in GranSim, too?
1702        blockThread(t);
1703     */
1704 #endif
1705 }
1706
1707 /* -----------------------------------------------------------------------------
1708  * Handle a thread that returned to the scheduler with ThreadFinished
1709  * -------------------------------------------------------------------------- */
1710
1711 static rtsBool
1712 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1713 {
1714     /* Need to check whether this was a main thread, and if so,
1715      * return with the return value.
1716      *
1717      * We also end up here if the thread kills itself with an
1718      * uncaught exception, see Exception.cmm.
1719      */
1720     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1721                                   t->id, whatNext_strs[t->what_next]));
1722
1723 #if defined(GRAN)
1724       endThread(t, CurrentProc); // clean-up the thread
1725 #elif defined(PARALLEL_HASKELL)
1726       /* For now all are advisory -- HWL */
1727       //if(t->priority==AdvisoryPriority) ??
1728       advisory_thread_count--; // JB: Caution with this counter, buggy!
1729       
1730 # if defined(DIST)
1731       if(t->dist.priority==RevalPriority)
1732         FinishReval(t);
1733 # endif
1734     
1735 # if defined(EDENOLD)
1736       // the thread could still have an outport... (BUG)
1737       if (t->eden.outport != -1) {
1738       // delete the outport for the tso which has finished...
1739         IF_PAR_DEBUG(eden_ports,
1740                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1741                               t->eden.outport, t->id));
1742         deleteOPT(t);
1743       }
1744       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1745       if (t->eden.epid != -1) {
1746         IF_PAR_DEBUG(eden_ports,
1747                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1748                            t->id, t->eden.epid));
1749         removeTSOfromProcess(t);
1750       }
1751 # endif 
1752
1753 # if defined(PAR)
1754       if (RtsFlags.ParFlags.ParStats.Full &&
1755           !RtsFlags.ParFlags.ParStats.Suppressed) 
1756         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1757
1758       //  t->par only contains statistics: left out for now...
1759       IF_PAR_DEBUG(fish,
1760                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1761                               t->id,t,t->par.sparkname));
1762 # endif
1763 #endif // PARALLEL_HASKELL
1764
1765       //
1766       // Check whether the thread that just completed was a bound
1767       // thread, and if so return with the result.  
1768       //
1769       // There is an assumption here that all thread completion goes
1770       // through this point; we need to make sure that if a thread
1771       // ends up in the ThreadKilled state, that it stays on the run
1772       // queue so it can be dealt with here.
1773       //
1774
1775       if (t->bound) {
1776
1777           if (t->bound != task) {
1778 #if !defined(THREADED_RTS)
1779               // Must be a bound thread that is not the topmost one.  Leave
1780               // it on the run queue until the stack has unwound to the
1781               // point where we can deal with this.  Leaving it on the run
1782               // queue also ensures that the garbage collector knows about
1783               // this thread and its return value (it gets dropped from the
1784               // all_threads list so there's no other way to find it).
1785               appendToRunQueue(cap,t);
1786               return rtsFalse;
1787 #else
1788               // this cannot happen in the threaded RTS, because a
1789               // bound thread can only be run by the appropriate Task.
1790               barf("finished bound thread that isn't mine");
1791 #endif
1792           }
1793
1794           ASSERT(task->tso == t);
1795
1796           if (t->what_next == ThreadComplete) {
1797               if (task->ret) {
1798                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1799                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1800               }
1801               task->stat = Success;
1802           } else {
1803               if (task->ret) {
1804                   *(task->ret) = NULL;
1805               }
1806               if (interrupted) {
1807                   task->stat = Interrupted;
1808               } else {
1809                   task->stat = Killed;
1810               }
1811           }
1812 #ifdef DEBUG
1813           removeThreadLabel((StgWord)task->tso->id);
1814 #endif
1815           return rtsTrue; // tells schedule() to return
1816       }
1817
1818       return rtsFalse;
1819 }
1820
1821 /* -----------------------------------------------------------------------------
1822  * Perform a heap census, if PROFILING
1823  * -------------------------------------------------------------------------- */
1824
1825 static rtsBool
1826 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1827 {
1828 #if defined(PROFILING)
1829     // When we have +RTS -i0 and we're heap profiling, do a census at
1830     // every GC.  This lets us get repeatable runs for debugging.
1831     if (performHeapProfile ||
1832         (RtsFlags.ProfFlags.profileInterval==0 &&
1833          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1834         GarbageCollect(GetRoots, rtsTrue);
1835         heapCensus();
1836         performHeapProfile = rtsFalse;
1837         return rtsTrue;  // true <=> we already GC'd
1838     }
1839 #endif
1840     return rtsFalse;
1841 }
1842
1843 /* -----------------------------------------------------------------------------
1844  * Perform a garbage collection if necessary
1845  * -------------------------------------------------------------------------- */
1846
1847 static void
1848 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1849 {
1850     StgTSO *t;
1851 #ifdef SMP
1852     static volatile StgWord waiting_for_gc;
1853     rtsBool was_waiting;
1854     nat i;
1855 #endif
1856
1857 #ifdef SMP
1858     // In order to GC, there must be no threads running Haskell code.
1859     // Therefore, the GC thread needs to hold *all* the capabilities,
1860     // and release them after the GC has completed.  
1861     //
1862     // This seems to be the simplest way: previous attempts involved
1863     // making all the threads with capabilities give up their
1864     // capabilities and sleep except for the *last* one, which
1865     // actually did the GC.  But it's quite hard to arrange for all
1866     // the other tasks to sleep and stay asleep.
1867     //
1868         
1869     was_waiting = cas(&waiting_for_gc, 0, 1);
1870     if (was_waiting) {
1871         do {
1872             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1873             yieldCapability(&cap,task);
1874         } while (waiting_for_gc);
1875         return;
1876     }
1877
1878     for (i=0; i < n_capabilities; i++) {
1879         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1880         if (cap != &capabilities[i]) {
1881             Capability *pcap = &capabilities[i];
1882             // we better hope this task doesn't get migrated to
1883             // another Capability while we're waiting for this one.
1884             // It won't, because load balancing happens while we have
1885             // all the Capabilities, but even so it's a slightly
1886             // unsavoury invariant.
1887             task->cap = pcap;
1888             context_switch = 1;
1889             waitForReturnCapability(&pcap, task);
1890             if (pcap != &capabilities[i]) {
1891                 barf("scheduleDoGC: got the wrong capability");
1892             }
1893         }
1894     }
1895
1896     waiting_for_gc = rtsFalse;
1897 #endif
1898
1899     /* Kick any transactions which are invalid back to their
1900      * atomically frames.  When next scheduled they will try to
1901      * commit, this commit will fail and they will retry.
1902      */
1903     { 
1904         StgTSO *next;
1905
1906         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1907             if (t->what_next == ThreadRelocated) {
1908                 next = t->link;
1909             } else {
1910                 next = t->global_link;
1911                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1912                     if (!stmValidateNestOfTransactions (t -> trec)) {
1913                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1914                         
1915                         // strip the stack back to the
1916                         // ATOMICALLY_FRAME, aborting the (nested)
1917                         // transaction, and saving the stack of any
1918                         // partially-evaluated thunks on the heap.
1919                         raiseAsync_(cap, t, NULL, rtsTrue);
1920                         
1921 #ifdef REG_R1
1922                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1923 #endif
1924                     }
1925                 }
1926             }
1927         }
1928     }
1929     
1930     // so this happens periodically:
1931     scheduleCheckBlackHoles(cap);
1932     
1933     IF_DEBUG(scheduler, printAllThreads());
1934
1935     /* everybody back, start the GC.
1936      * Could do it in this thread, or signal a condition var
1937      * to do it in another thread.  Either way, we need to
1938      * broadcast on gc_pending_cond afterward.
1939      */
1940 #if defined(THREADED_RTS)
1941     IF_DEBUG(scheduler,sched_belch("doing GC"));
1942 #endif
1943     GarbageCollect(GetRoots, force_major);
1944     
1945 #if defined(SMP)
1946     // release our stash of capabilities.
1947     for (i = 0; i < n_capabilities; i++) {
1948         if (cap != &capabilities[i]) {
1949             task->cap = &capabilities[i];
1950             releaseCapability(&capabilities[i]);
1951         }
1952     }
1953     task->cap = cap;
1954 #endif
1955
1956 #if defined(GRAN)
1957     /* add a ContinueThread event to continue execution of current thread */
1958     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1959               ContinueThread,
1960               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1961     IF_GRAN_DEBUG(bq, 
1962                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1963                   G_EVENTQ(0);
1964                   G_CURR_THREADQ(0));
1965 #endif /* GRAN */
1966 }
1967
1968 /* ---------------------------------------------------------------------------
1969  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1970  * used by Control.Concurrent for error checking.
1971  * ------------------------------------------------------------------------- */
1972  
1973 StgBool
1974 rtsSupportsBoundThreads(void)
1975 {
1976 #if defined(THREADED_RTS)
1977   return rtsTrue;
1978 #else
1979   return rtsFalse;
1980 #endif
1981 }
1982
1983 /* ---------------------------------------------------------------------------
1984  * isThreadBound(tso): check whether tso is bound to an OS thread.
1985  * ------------------------------------------------------------------------- */
1986  
1987 StgBool
1988 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1989 {
1990 #if defined(THREADED_RTS)
1991   return (tso->bound != NULL);
1992 #endif
1993   return rtsFalse;
1994 }
1995
1996 /* ---------------------------------------------------------------------------
1997  * Singleton fork(). Do not copy any running threads.
1998  * ------------------------------------------------------------------------- */
1999
2000 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2001 #define FORKPROCESS_PRIMOP_SUPPORTED
2002 #endif
2003
2004 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2005 static void 
2006 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2007 #endif
2008 StgInt
2009 forkProcess(HsStablePtr *entry
2010 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2011             STG_UNUSED
2012 #endif
2013            )
2014 {
2015 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2016     pid_t pid;
2017     StgTSO* t,*next;
2018     Task *task;
2019     Capability *cap;
2020     
2021     IF_DEBUG(scheduler,sched_belch("forking!"));
2022     
2023     // ToDo: for SMP, we should probably acquire *all* the capabilities
2024     cap = rts_lock();
2025     
2026     pid = fork();
2027     
2028     if (pid) { // parent
2029         
2030         // just return the pid
2031         rts_unlock(cap);
2032         return pid;
2033         
2034     } else { // child
2035         
2036         // delete all threads
2037         cap->run_queue_hd = END_TSO_QUEUE;
2038         cap->run_queue_tl = END_TSO_QUEUE;
2039         
2040         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2041             next = t->link;
2042             
2043             // don't allow threads to catch the ThreadKilled exception
2044             deleteThreadImmediately(cap,t);
2045         }
2046         
2047         // wipe the main thread list
2048         while ((task = all_tasks) != NULL) {
2049             all_tasks = task->all_link;
2050             discardTask(task);
2051         }
2052         
2053         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2054         rts_checkSchedStatus("forkProcess",cap);
2055         
2056         rts_unlock(cap);
2057         hs_exit();                      // clean up and exit
2058         stg_exit(EXIT_SUCCESS);
2059     }
2060 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2061     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2062     return -1;
2063 #endif
2064 }
2065
2066 /* ---------------------------------------------------------------------------
2067  * Delete the threads on the run queue of the current capability.
2068  * ------------------------------------------------------------------------- */
2069    
2070 static void
2071 deleteRunQueue (Capability *cap)
2072 {
2073     StgTSO *t, *next;
2074     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2075         ASSERT(t->what_next != ThreadRelocated);
2076         next = t->link;
2077         deleteThread(cap, t);
2078     }
2079 }
2080
2081 /* startThread and  insertThread are now in GranSim.c -- HWL */
2082
2083
2084 /* -----------------------------------------------------------------------------
2085    Managing the suspended_ccalling_tasks list.
2086    Locks required: sched_mutex
2087    -------------------------------------------------------------------------- */
2088
2089 STATIC_INLINE void
2090 suspendTask (Capability *cap, Task *task)
2091 {
2092     ASSERT(task->next == NULL && task->prev == NULL);
2093     task->next = cap->suspended_ccalling_tasks;
2094     task->prev = NULL;
2095     if (cap->suspended_ccalling_tasks) {
2096         cap->suspended_ccalling_tasks->prev = task;
2097     }
2098     cap->suspended_ccalling_tasks = task;
2099 }
2100
2101 STATIC_INLINE void
2102 recoverSuspendedTask (Capability *cap, Task *task)
2103 {
2104     if (task->prev) {
2105         task->prev->next = task->next;
2106     } else {
2107         ASSERT(cap->suspended_ccalling_tasks == task);
2108         cap->suspended_ccalling_tasks = task->next;
2109     }
2110     if (task->next) {
2111         task->next->prev = task->prev;
2112     }
2113     task->next = task->prev = NULL;
2114 }
2115
2116 /* ---------------------------------------------------------------------------
2117  * Suspending & resuming Haskell threads.
2118  * 
2119  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2120  * its capability before calling the C function.  This allows another
2121  * task to pick up the capability and carry on running Haskell
2122  * threads.  It also means that if the C call blocks, it won't lock
2123  * the whole system.
2124  *
2125  * The Haskell thread making the C call is put to sleep for the
2126  * duration of the call, on the susepended_ccalling_threads queue.  We
2127  * give out a token to the task, which it can use to resume the thread
2128  * on return from the C function.
2129  * ------------------------------------------------------------------------- */
2130    
2131 void *
2132 suspendThread (StgRegTable *reg)
2133 {
2134   Capability *cap;
2135   int saved_errno = errno;
2136   StgTSO *tso;
2137   Task *task;
2138
2139   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2140    */
2141   cap = regTableToCapability(reg);
2142
2143   task = cap->running_task;
2144   tso = cap->r.rCurrentTSO;
2145
2146   IF_DEBUG(scheduler,
2147            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2148
2149   // XXX this might not be necessary --SDM
2150   tso->what_next = ThreadRunGHC;
2151
2152   threadPaused(tso);
2153
2154   if(tso->blocked_exceptions == NULL)  {
2155       tso->why_blocked = BlockedOnCCall;
2156       tso->blocked_exceptions = END_TSO_QUEUE;
2157   } else {
2158       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2159   }
2160
2161   // Hand back capability
2162   task->suspended_tso = tso;
2163
2164   ACQUIRE_LOCK(&cap->lock);
2165
2166   suspendTask(cap,task);
2167   cap->in_haskell = rtsFalse;
2168   releaseCapability_(cap);
2169   
2170   RELEASE_LOCK(&cap->lock);
2171
2172 #if defined(THREADED_RTS)
2173   /* Preparing to leave the RTS, so ensure there's a native thread/task
2174      waiting to take over.
2175   */
2176   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2177 #endif
2178
2179   errno = saved_errno;
2180   return task;
2181 }
2182
2183 StgRegTable *
2184 resumeThread (void *task_)
2185 {
2186     StgTSO *tso;
2187     Capability *cap;
2188     int saved_errno = errno;
2189     Task *task = task_;
2190
2191     cap = task->cap;
2192     // Wait for permission to re-enter the RTS with the result.
2193     waitForReturnCapability(&cap,task);
2194     // we might be on a different capability now... but if so, our
2195     // entry on the suspended_ccalling_tasks list will also have been
2196     // migrated.
2197
2198     // Remove the thread from the suspended list
2199     recoverSuspendedTask(cap,task);
2200
2201     tso = task->suspended_tso;
2202     task->suspended_tso = NULL;
2203     tso->link = END_TSO_QUEUE;
2204     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2205     
2206     if (tso->why_blocked == BlockedOnCCall) {
2207         awakenBlockedQueue(cap,tso->blocked_exceptions);
2208         tso->blocked_exceptions = NULL;
2209     }
2210     
2211     /* Reset blocking status */
2212     tso->why_blocked  = NotBlocked;
2213     
2214     cap->r.rCurrentTSO = tso;
2215     cap->in_haskell = rtsTrue;
2216     errno = saved_errno;
2217
2218     return &cap->r;
2219 }
2220
2221 /* ---------------------------------------------------------------------------
2222  * Comparing Thread ids.
2223  *
2224  * This is used from STG land in the implementation of the
2225  * instances of Eq/Ord for ThreadIds.
2226  * ------------------------------------------------------------------------ */
2227
2228 int
2229 cmp_thread(StgPtr tso1, StgPtr tso2) 
2230
2231   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2232   StgThreadID id2 = ((StgTSO *)tso2)->id;
2233  
2234   if (id1 < id2) return (-1);
2235   if (id1 > id2) return 1;
2236   return 0;
2237 }
2238
2239 /* ---------------------------------------------------------------------------
2240  * Fetching the ThreadID from an StgTSO.
2241  *
2242  * This is used in the implementation of Show for ThreadIds.
2243  * ------------------------------------------------------------------------ */
2244 int
2245 rts_getThreadId(StgPtr tso) 
2246 {
2247   return ((StgTSO *)tso)->id;
2248 }
2249
2250 #ifdef DEBUG
2251 void
2252 labelThread(StgPtr tso, char *label)
2253 {
2254   int len;
2255   void *buf;
2256
2257   /* Caveat: Once set, you can only set the thread name to "" */
2258   len = strlen(label)+1;
2259   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2260   strncpy(buf,label,len);
2261   /* Update will free the old memory for us */
2262   updateThreadLabel(((StgTSO *)tso)->id,buf);
2263 }
2264 #endif /* DEBUG */
2265
2266 /* ---------------------------------------------------------------------------
2267    Create a new thread.
2268
2269    The new thread starts with the given stack size.  Before the
2270    scheduler can run, however, this thread needs to have a closure
2271    (and possibly some arguments) pushed on its stack.  See
2272    pushClosure() in Schedule.h.
2273
2274    createGenThread() and createIOThread() (in SchedAPI.h) are
2275    convenient packaged versions of this function.
2276
2277    currently pri (priority) is only used in a GRAN setup -- HWL
2278    ------------------------------------------------------------------------ */
2279 #if defined(GRAN)
2280 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2281 StgTSO *
2282 createThread(nat size, StgInt pri)
2283 #else
2284 StgTSO *
2285 createThread(Capability *cap, nat size)
2286 #endif
2287 {
2288     StgTSO *tso;
2289     nat stack_size;
2290
2291     /* sched_mutex is *not* required */
2292
2293     /* First check whether we should create a thread at all */
2294 #if defined(PARALLEL_HASKELL)
2295     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2296     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2297         threadsIgnored++;
2298         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2299                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2300         return END_TSO_QUEUE;
2301     }
2302     threadsCreated++;
2303 #endif
2304
2305 #if defined(GRAN)
2306     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2307 #endif
2308
2309     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2310
2311     /* catch ridiculously small stack sizes */
2312     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2313         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2314     }
2315
2316     stack_size = size - TSO_STRUCT_SIZEW;
2317     
2318     tso = (StgTSO *)allocateLocal(cap, size);
2319     TICK_ALLOC_TSO(stack_size, 0);
2320
2321     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2322 #if defined(GRAN)
2323     SET_GRAN_HDR(tso, ThisPE);
2324 #endif
2325
2326     // Always start with the compiled code evaluator
2327     tso->what_next = ThreadRunGHC;
2328
2329     tso->why_blocked  = NotBlocked;
2330     tso->blocked_exceptions = NULL;
2331     
2332     tso->saved_errno = 0;
2333     tso->bound = NULL;
2334     
2335     tso->stack_size     = stack_size;
2336     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2337                           - TSO_STRUCT_SIZEW;
2338     tso->sp             = (P_)&(tso->stack) + stack_size;
2339
2340     tso->trec = NO_TREC;
2341     
2342 #ifdef PROFILING
2343     tso->prof.CCCS = CCS_MAIN;
2344 #endif
2345     
2346   /* put a stop frame on the stack */
2347     tso->sp -= sizeofW(StgStopFrame);
2348     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2349     tso->link = END_TSO_QUEUE;
2350     
2351   // ToDo: check this
2352 #if defined(GRAN)
2353     /* uses more flexible routine in GranSim */
2354     insertThread(tso, CurrentProc);
2355 #else
2356     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2357      * from its creation
2358      */
2359 #endif
2360     
2361 #if defined(GRAN) 
2362     if (RtsFlags.GranFlags.GranSimStats.Full) 
2363         DumpGranEvent(GR_START,tso);
2364 #elif defined(PARALLEL_HASKELL)
2365     if (RtsFlags.ParFlags.ParStats.Full) 
2366         DumpGranEvent(GR_STARTQ,tso);
2367     /* HACk to avoid SCHEDULE 
2368        LastTSO = tso; */
2369 #endif
2370     
2371     /* Link the new thread on the global thread list.
2372      */
2373     ACQUIRE_LOCK(&sched_mutex);
2374     tso->id = next_thread_id++;  // while we have the mutex
2375     tso->global_link = all_threads;
2376     all_threads = tso;
2377     RELEASE_LOCK(&sched_mutex);
2378     
2379 #if defined(DIST)
2380     tso->dist.priority = MandatoryPriority; //by default that is...
2381 #endif
2382     
2383 #if defined(GRAN)
2384     tso->gran.pri = pri;
2385 # if defined(DEBUG)
2386     tso->gran.magic = TSO_MAGIC; // debugging only
2387 # endif
2388     tso->gran.sparkname   = 0;
2389     tso->gran.startedat   = CURRENT_TIME; 
2390     tso->gran.exported    = 0;
2391     tso->gran.basicblocks = 0;
2392     tso->gran.allocs      = 0;
2393     tso->gran.exectime    = 0;
2394     tso->gran.fetchtime   = 0;
2395     tso->gran.fetchcount  = 0;
2396     tso->gran.blocktime   = 0;
2397     tso->gran.blockcount  = 0;
2398     tso->gran.blockedat   = 0;
2399     tso->gran.globalsparks = 0;
2400     tso->gran.localsparks  = 0;
2401     if (RtsFlags.GranFlags.Light)
2402         tso->gran.clock  = Now; /* local clock */
2403     else
2404         tso->gran.clock  = 0;
2405     
2406     IF_DEBUG(gran,printTSO(tso));
2407 #elif defined(PARALLEL_HASKELL)
2408 # if defined(DEBUG)
2409     tso->par.magic = TSO_MAGIC; // debugging only
2410 # endif
2411     tso->par.sparkname   = 0;
2412     tso->par.startedat   = CURRENT_TIME; 
2413     tso->par.exported    = 0;
2414     tso->par.basicblocks = 0;
2415     tso->par.allocs      = 0;
2416     tso->par.exectime    = 0;
2417     tso->par.fetchtime   = 0;
2418     tso->par.fetchcount  = 0;
2419     tso->par.blocktime   = 0;
2420     tso->par.blockcount  = 0;
2421     tso->par.blockedat   = 0;
2422     tso->par.globalsparks = 0;
2423     tso->par.localsparks  = 0;
2424 #endif
2425     
2426 #if defined(GRAN)
2427     globalGranStats.tot_threads_created++;
2428     globalGranStats.threads_created_on_PE[CurrentProc]++;
2429     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2430     globalGranStats.tot_sq_probes++;
2431 #elif defined(PARALLEL_HASKELL)
2432     // collect parallel global statistics (currently done together with GC stats)
2433     if (RtsFlags.ParFlags.ParStats.Global &&
2434         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2435         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2436         globalParStats.tot_threads_created++;
2437     }
2438 #endif 
2439     
2440 #if defined(GRAN)
2441     IF_GRAN_DEBUG(pri,
2442                   sched_belch("==__ schedule: Created TSO %d (%p);",
2443                               CurrentProc, tso, tso->id));
2444 #elif defined(PARALLEL_HASKELL)
2445     IF_PAR_DEBUG(verbose,
2446                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2447                              (long)tso->id, tso, advisory_thread_count));
2448 #else
2449     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2450                                    (long)tso->id, (long)tso->stack_size));
2451 #endif    
2452     return tso;
2453 }
2454
2455 #if defined(PAR)
2456 /* RFP:
2457    all parallel thread creation calls should fall through the following routine.
2458 */
2459 StgTSO *
2460 createThreadFromSpark(rtsSpark spark) 
2461 { StgTSO *tso;
2462   ASSERT(spark != (rtsSpark)NULL);
2463 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2464   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2465   { threadsIgnored++;
2466     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2467           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2468     return END_TSO_QUEUE;
2469   }
2470   else
2471   { threadsCreated++;
2472     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2473     if (tso==END_TSO_QUEUE)     
2474       barf("createSparkThread: Cannot create TSO");
2475 #if defined(DIST)
2476     tso->priority = AdvisoryPriority;
2477 #endif
2478     pushClosure(tso,spark);
2479     addToRunQueue(tso);
2480     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2481   }
2482   return tso;
2483 }
2484 #endif
2485
2486 /*
2487   Turn a spark into a thread.
2488   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2489 */
2490 #if 0
2491 StgTSO *
2492 activateSpark (rtsSpark spark) 
2493 {
2494   StgTSO *tso;
2495
2496   tso = createSparkThread(spark);
2497   if (RtsFlags.ParFlags.ParStats.Full) {   
2498     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2499       IF_PAR_DEBUG(verbose,
2500                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2501                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2502   }
2503   // ToDo: fwd info on local/global spark to thread -- HWL
2504   // tso->gran.exported =  spark->exported;
2505   // tso->gran.locked =   !spark->global;
2506   // tso->gran.sparkname = spark->name;
2507
2508   return tso;
2509 }
2510 #endif
2511
2512 /* ---------------------------------------------------------------------------
2513  * scheduleThread()
2514  *
2515  * scheduleThread puts a thread on the end  of the runnable queue.
2516  * This will usually be done immediately after a thread is created.
2517  * The caller of scheduleThread must create the thread using e.g.
2518  * createThread and push an appropriate closure
2519  * on this thread's stack before the scheduler is invoked.
2520  * ------------------------------------------------------------------------ */
2521
2522 void
2523 scheduleThread(Capability *cap, StgTSO *tso)
2524 {
2525     // The thread goes at the *end* of the run-queue, to avoid possible
2526     // starvation of any threads already on the queue.
2527     appendToRunQueue(cap,tso);
2528 }
2529
2530 Capability *
2531 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2532 {
2533     Task *task;
2534
2535     // We already created/initialised the Task
2536     task = cap->running_task;
2537
2538     // This TSO is now a bound thread; make the Task and TSO
2539     // point to each other.
2540     tso->bound = task;
2541
2542     task->tso = tso;
2543     task->ret = ret;
2544     task->stat = NoStatus;
2545
2546     appendToRunQueue(cap,tso);
2547
2548     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2549
2550 #if defined(GRAN)
2551     /* GranSim specific init */
2552     CurrentTSO = m->tso;                // the TSO to run
2553     procStatus[MainProc] = Busy;        // status of main PE
2554     CurrentProc = MainProc;             // PE to run it on
2555 #endif
2556
2557     cap = schedule(cap,task);
2558
2559     ASSERT(task->stat != NoStatus);
2560     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2561
2562     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2563     return cap;
2564 }
2565
2566 /* ----------------------------------------------------------------------------
2567  * Starting Tasks
2568  * ------------------------------------------------------------------------- */
2569
2570 #if defined(THREADED_RTS)
2571 void
2572 workerStart(Task *task)
2573 {
2574     Capability *cap;
2575
2576     // See startWorkerTask().
2577     ACQUIRE_LOCK(&task->lock);
2578     cap = task->cap;
2579     RELEASE_LOCK(&task->lock);
2580
2581     // set the thread-local pointer to the Task:
2582     taskEnter(task);
2583
2584     // schedule() runs without a lock.
2585     cap = schedule(cap,task);
2586
2587     // On exit from schedule(), we have a Capability.
2588     releaseCapability(cap);
2589     taskStop(task);
2590 }
2591 #endif
2592
2593 /* ---------------------------------------------------------------------------
2594  * initScheduler()
2595  *
2596  * Initialise the scheduler.  This resets all the queues - if the
2597  * queues contained any threads, they'll be garbage collected at the
2598  * next pass.
2599  *
2600  * ------------------------------------------------------------------------ */
2601
2602 void 
2603 initScheduler(void)
2604 {
2605 #if defined(GRAN)
2606   nat i;
2607   for (i=0; i<=MAX_PROC; i++) {
2608     run_queue_hds[i]      = END_TSO_QUEUE;
2609     run_queue_tls[i]      = END_TSO_QUEUE;
2610     blocked_queue_hds[i]  = END_TSO_QUEUE;
2611     blocked_queue_tls[i]  = END_TSO_QUEUE;
2612     ccalling_threadss[i]  = END_TSO_QUEUE;
2613     blackhole_queue[i]    = END_TSO_QUEUE;
2614     sleeping_queue        = END_TSO_QUEUE;
2615   }
2616 #elif !defined(THREADED_RTS)
2617   blocked_queue_hd  = END_TSO_QUEUE;
2618   blocked_queue_tl  = END_TSO_QUEUE;
2619   sleeping_queue    = END_TSO_QUEUE;
2620 #endif
2621
2622   blackhole_queue   = END_TSO_QUEUE;
2623   all_threads       = END_TSO_QUEUE;
2624
2625   context_switch = 0;
2626   interrupted    = 0;
2627
2628   RtsFlags.ConcFlags.ctxtSwitchTicks =
2629       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2630       
2631 #if defined(THREADED_RTS)
2632   /* Initialise the mutex and condition variables used by
2633    * the scheduler. */
2634   initMutex(&sched_mutex);
2635 #endif
2636   
2637   ACQUIRE_LOCK(&sched_mutex);
2638
2639   /* A capability holds the state a native thread needs in
2640    * order to execute STG code. At least one capability is
2641    * floating around (only SMP builds have more than one).
2642    */
2643   initCapabilities();
2644
2645   initTaskManager();
2646
2647 #if defined(SMP)
2648   /*
2649    * Eagerly start one worker to run each Capability, except for
2650    * Capability 0.  The idea is that we're probably going to start a
2651    * bound thread on Capability 0 pretty soon, so we don't want a
2652    * worker task hogging it.
2653    */
2654   { 
2655       nat i;
2656       Capability *cap;
2657       for (i = 1; i < n_capabilities; i++) {
2658           cap = &capabilities[i];
2659           ACQUIRE_LOCK(&cap->lock);
2660           startWorkerTask(cap, workerStart);
2661           RELEASE_LOCK(&cap->lock);
2662       }
2663   }
2664 #endif
2665
2666 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2667   initSparkPools();
2668 #endif
2669
2670   RELEASE_LOCK(&sched_mutex);
2671 }
2672
2673 void
2674 exitScheduler( void )
2675 {
2676     interrupted = rtsTrue;
2677     shutting_down_scheduler = rtsTrue;
2678
2679 #if defined(THREADED_RTS)
2680     { 
2681         Task *task;
2682         nat i;
2683         
2684         ACQUIRE_LOCK(&sched_mutex);
2685         task = newBoundTask();
2686         RELEASE_LOCK(&sched_mutex);
2687
2688         for (i = 0; i < n_capabilities; i++) {
2689             shutdownCapability(&capabilities[i], task);
2690         }
2691         boundTaskExiting(task);
2692         stopTaskManager();
2693     }
2694 #endif
2695 }
2696
2697 /* ---------------------------------------------------------------------------
2698    Where are the roots that we know about?
2699
2700         - all the threads on the runnable queue
2701         - all the threads on the blocked queue
2702         - all the threads on the sleeping queue
2703         - all the thread currently executing a _ccall_GC
2704         - all the "main threads"
2705      
2706    ------------------------------------------------------------------------ */
2707
2708 /* This has to be protected either by the scheduler monitor, or by the
2709         garbage collection monitor (probably the latter).
2710         KH @ 25/10/99
2711 */
2712
2713 void
2714 GetRoots( evac_fn evac )
2715 {
2716     nat i;
2717     Capability *cap;
2718     Task *task;
2719
2720 #if defined(GRAN)
2721     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2722         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2723             evac((StgClosure **)&run_queue_hds[i]);
2724         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2725             evac((StgClosure **)&run_queue_tls[i]);
2726         
2727         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2728             evac((StgClosure **)&blocked_queue_hds[i]);
2729         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2730             evac((StgClosure **)&blocked_queue_tls[i]);
2731         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2732             evac((StgClosure **)&ccalling_threads[i]);
2733     }
2734
2735     markEventQueue();
2736
2737 #else /* !GRAN */
2738
2739     for (i = 0; i < n_capabilities; i++) {
2740         cap = &capabilities[i];
2741         evac((StgClosure **)&cap->run_queue_hd);
2742         evac((StgClosure **)&cap->run_queue_tl);
2743         
2744         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2745              task=task->next) {
2746             evac((StgClosure **)&task->suspended_tso);
2747         }
2748     }
2749     
2750 #if !defined(THREADED_RTS)
2751     evac((StgClosure **)&blocked_queue_hd);
2752     evac((StgClosure **)&blocked_queue_tl);
2753     evac((StgClosure **)&sleeping_queue);
2754 #endif 
2755 #endif
2756
2757     evac((StgClosure **)&blackhole_queue);
2758
2759 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2760     markSparkQueue(evac);
2761 #endif
2762     
2763 #if defined(RTS_USER_SIGNALS)
2764     // mark the signal handlers (signals should be already blocked)
2765     markSignalHandlers(evac);
2766 #endif
2767 }
2768
2769 /* -----------------------------------------------------------------------------
2770    performGC
2771
2772    This is the interface to the garbage collector from Haskell land.
2773    We provide this so that external C code can allocate and garbage
2774    collect when called from Haskell via _ccall_GC.
2775
2776    It might be useful to provide an interface whereby the programmer
2777    can specify more roots (ToDo).
2778    
2779    This needs to be protected by the GC condition variable above.  KH.
2780    -------------------------------------------------------------------------- */
2781
2782 static void (*extra_roots)(evac_fn);
2783
2784 void
2785 performGC(void)
2786 {
2787 #ifdef THREADED_RTS
2788     // ToDo: we have to grab all the capabilities here.
2789     errorBelch("performGC not supported in threaded RTS (yet)");
2790     stg_exit(EXIT_FAILURE);
2791 #endif
2792     /* Obligated to hold this lock upon entry */
2793     GarbageCollect(GetRoots,rtsFalse);
2794 }
2795
2796 void
2797 performMajorGC(void)
2798 {
2799 #ifdef THREADED_RTS
2800     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2801     stg_exit(EXIT_FAILURE);
2802 #endif
2803     GarbageCollect(GetRoots,rtsTrue);
2804 }
2805
2806 static void
2807 AllRoots(evac_fn evac)
2808 {
2809     GetRoots(evac);             // the scheduler's roots
2810     extra_roots(evac);          // the user's roots
2811 }
2812
2813 void
2814 performGCWithRoots(void (*get_roots)(evac_fn))
2815 {
2816 #ifdef THREADED_RTS
2817     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2818     stg_exit(EXIT_FAILURE);
2819 #endif
2820     extra_roots = get_roots;
2821     GarbageCollect(AllRoots,rtsFalse);
2822 }
2823
2824 /* -----------------------------------------------------------------------------
2825    Stack overflow
2826
2827    If the thread has reached its maximum stack size, then raise the
2828    StackOverflow exception in the offending thread.  Otherwise
2829    relocate the TSO into a larger chunk of memory and adjust its stack
2830    size appropriately.
2831    -------------------------------------------------------------------------- */
2832
2833 static StgTSO *
2834 threadStackOverflow(Capability *cap, StgTSO *tso)
2835 {
2836   nat new_stack_size, stack_words;
2837   lnat new_tso_size;
2838   StgPtr new_sp;
2839   StgTSO *dest;
2840
2841   IF_DEBUG(sanity,checkTSO(tso));
2842   if (tso->stack_size >= tso->max_stack_size) {
2843
2844     IF_DEBUG(gc,
2845              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2846                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2847              /* If we're debugging, just print out the top of the stack */
2848              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2849                                               tso->sp+64)));
2850
2851     /* Send this thread the StackOverflow exception */
2852     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2853     return tso;
2854   }
2855
2856   /* Try to double the current stack size.  If that takes us over the
2857    * maximum stack size for this thread, then use the maximum instead.
2858    * Finally round up so the TSO ends up as a whole number of blocks.
2859    */
2860   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2861   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2862                                        TSO_STRUCT_SIZE)/sizeof(W_);
2863   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2864   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2865
2866   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
2867
2868   dest = (StgTSO *)allocate(new_tso_size);
2869   TICK_ALLOC_TSO(new_stack_size,0);
2870
2871   /* copy the TSO block and the old stack into the new area */
2872   memcpy(dest,tso,TSO_STRUCT_SIZE);
2873   stack_words = tso->stack + tso->stack_size - tso->sp;
2874   new_sp = (P_)dest + new_tso_size - stack_words;
2875   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2876
2877   /* relocate the stack pointers... */
2878   dest->sp         = new_sp;
2879   dest->stack_size = new_stack_size;
2880         
2881   /* Mark the old TSO as relocated.  We have to check for relocated
2882    * TSOs in the garbage collector and any primops that deal with TSOs.
2883    *
2884    * It's important to set the sp value to just beyond the end
2885    * of the stack, so we don't attempt to scavenge any part of the
2886    * dead TSO's stack.
2887    */
2888   tso->what_next = ThreadRelocated;
2889   tso->link = dest;
2890   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2891   tso->why_blocked = NotBlocked;
2892
2893   IF_PAR_DEBUG(verbose,
2894                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2895                      tso->id, tso, tso->stack_size);
2896                /* If we're debugging, just print out the top of the stack */
2897                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2898                                                 tso->sp+64)));
2899   
2900   IF_DEBUG(sanity,checkTSO(tso));
2901 #if 0
2902   IF_DEBUG(scheduler,printTSO(dest));
2903 #endif
2904
2905   return dest;
2906 }
2907
2908 /* ---------------------------------------------------------------------------
2909    Wake up a queue that was blocked on some resource.
2910    ------------------------------------------------------------------------ */
2911
2912 #if defined(GRAN)
2913 STATIC_INLINE void
2914 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2915 {
2916 }
2917 #elif defined(PARALLEL_HASKELL)
2918 STATIC_INLINE void
2919 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2920 {
2921   /* write RESUME events to log file and
2922      update blocked and fetch time (depending on type of the orig closure) */
2923   if (RtsFlags.ParFlags.ParStats.Full) {
2924     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2925                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2926                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2927     if (emptyRunQueue())
2928       emitSchedule = rtsTrue;
2929
2930     switch (get_itbl(node)->type) {
2931         case FETCH_ME_BQ:
2932           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2933           break;
2934         case RBH:
2935         case FETCH_ME:
2936         case BLACKHOLE_BQ:
2937           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2938           break;
2939 #ifdef DIST
2940         case MVAR:
2941           break;
2942 #endif    
2943         default:
2944           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2945         }
2946       }
2947 }
2948 #endif
2949
2950 #if defined(GRAN)
2951 StgBlockingQueueElement *
2952 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2953 {
2954     StgTSO *tso;
2955     PEs node_loc, tso_loc;
2956
2957     node_loc = where_is(node); // should be lifted out of loop
2958     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2959     tso_loc = where_is((StgClosure *)tso);
2960     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2961       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2962       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2963       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2964       // insertThread(tso, node_loc);
2965       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2966                 ResumeThread,
2967                 tso, node, (rtsSpark*)NULL);
2968       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2969       // len_local++;
2970       // len++;
2971     } else { // TSO is remote (actually should be FMBQ)
2972       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2973                                   RtsFlags.GranFlags.Costs.gunblocktime +
2974                                   RtsFlags.GranFlags.Costs.latency;
2975       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2976                 UnblockThread,
2977                 tso, node, (rtsSpark*)NULL);
2978       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2979       // len++;
2980     }
2981     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2982     IF_GRAN_DEBUG(bq,
2983                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2984                           (node_loc==tso_loc ? "Local" : "Global"), 
2985                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2986     tso->block_info.closure = NULL;
2987     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2988                              tso->id, tso));
2989 }
2990 #elif defined(PARALLEL_HASKELL)
2991 StgBlockingQueueElement *
2992 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2993 {
2994     StgBlockingQueueElement *next;
2995
2996     switch (get_itbl(bqe)->type) {
2997     case TSO:
2998       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2999       /* if it's a TSO just push it onto the run_queue */
3000       next = bqe->link;
3001       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3002       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3003       threadRunnable();
3004       unblockCount(bqe, node);
3005       /* reset blocking status after dumping event */
3006       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3007       break;
3008
3009     case BLOCKED_FETCH:
3010       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3011       next = bqe->link;
3012       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3013       PendingFetches = (StgBlockedFetch *)bqe;
3014       break;
3015
3016 # if defined(DEBUG)
3017       /* can ignore this case in a non-debugging setup; 
3018          see comments on RBHSave closures above */
3019     case CONSTR:
3020       /* check that the closure is an RBHSave closure */
3021       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3022              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3023              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3024       break;
3025
3026     default:
3027       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3028            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3029            (StgClosure *)bqe);
3030 # endif
3031     }
3032   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3033   return next;
3034 }
3035 #endif
3036
3037 StgTSO *
3038 unblockOne(Capability *cap, StgTSO *tso)
3039 {
3040   StgTSO *next;
3041
3042   ASSERT(get_itbl(tso)->type == TSO);
3043   ASSERT(tso->why_blocked != NotBlocked);
3044   tso->why_blocked = NotBlocked;
3045   next = tso->link;
3046   tso->link = END_TSO_QUEUE;
3047
3048   // We might have just migrated this TSO to our Capability:
3049   if (tso->bound) {
3050       tso->bound->cap = cap;
3051   }
3052
3053   appendToRunQueue(cap,tso);
3054
3055   // we're holding a newly woken thread, make sure we context switch
3056   // quickly so we can migrate it if necessary.
3057   context_switch = 1;
3058   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3059   return next;
3060 }
3061
3062
3063 #if defined(GRAN)
3064 void 
3065 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3066 {
3067   StgBlockingQueueElement *bqe;
3068   PEs node_loc;
3069   nat len = 0; 
3070
3071   IF_GRAN_DEBUG(bq, 
3072                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3073                       node, CurrentProc, CurrentTime[CurrentProc], 
3074                       CurrentTSO->id, CurrentTSO));
3075
3076   node_loc = where_is(node);
3077
3078   ASSERT(q == END_BQ_QUEUE ||
3079          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3080          get_itbl(q)->type == CONSTR); // closure (type constructor)
3081   ASSERT(is_unique(node));
3082
3083   /* FAKE FETCH: magically copy the node to the tso's proc;
3084      no Fetch necessary because in reality the node should not have been 
3085      moved to the other PE in the first place
3086   */
3087   if (CurrentProc!=node_loc) {
3088     IF_GRAN_DEBUG(bq, 
3089                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3090                         node, node_loc, CurrentProc, CurrentTSO->id, 
3091                         // CurrentTSO, where_is(CurrentTSO),
3092                         node->header.gran.procs));
3093     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3094     IF_GRAN_DEBUG(bq, 
3095                   debugBelch("## new bitmask of node %p is %#x\n",
3096                         node, node->header.gran.procs));
3097     if (RtsFlags.GranFlags.GranSimStats.Global) {
3098       globalGranStats.tot_fake_fetches++;
3099     }
3100   }
3101
3102   bqe = q;
3103   // ToDo: check: ASSERT(CurrentProc==node_loc);
3104   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3105     //next = bqe->link;
3106     /* 
3107        bqe points to the current element in the queue
3108        next points to the next element in the queue
3109     */
3110     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3111     //tso_loc = where_is(tso);
3112     len++;
3113     bqe = unblockOne(bqe, node);
3114   }
3115
3116   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3117      the closure to make room for the anchor of the BQ */
3118   if (bqe!=END_BQ_QUEUE) {
3119     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3120     /*
3121     ASSERT((info_ptr==&RBH_Save_0_info) ||
3122            (info_ptr==&RBH_Save_1_info) ||
3123            (info_ptr==&RBH_Save_2_info));
3124     */
3125     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3126     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3127     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3128
3129     IF_GRAN_DEBUG(bq,
3130                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3131                         node, info_type(node)));
3132   }
3133
3134   /* statistics gathering */
3135   if (RtsFlags.GranFlags.GranSimStats.Global) {
3136     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3137     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3138     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3139     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3140   }
3141   IF_GRAN_DEBUG(bq,
3142                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3143                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3144 }
3145 #elif defined(PARALLEL_HASKELL)
3146 void 
3147 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3148 {
3149   StgBlockingQueueElement *bqe;
3150
3151   IF_PAR_DEBUG(verbose, 
3152                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3153                      node, mytid));
3154 #ifdef DIST  
3155   //RFP
3156   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3157     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3158     return;
3159   }
3160 #endif
3161   
3162   ASSERT(q == END_BQ_QUEUE ||
3163          get_itbl(q)->type == TSO ||           
3164          get_itbl(q)->type == BLOCKED_FETCH || 
3165          get_itbl(q)->type == CONSTR); 
3166
3167   bqe = q;
3168   while (get_itbl(bqe)->type==TSO || 
3169          get_itbl(bqe)->type==BLOCKED_FETCH) {
3170     bqe = unblockOne(bqe, node);
3171   }
3172 }
3173
3174 #else   /* !GRAN && !PARALLEL_HASKELL */
3175
3176 void
3177 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3178 {
3179     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3180                              // Exception.cmm
3181     while (tso != END_TSO_QUEUE) {
3182         tso = unblockOne(cap,tso);
3183     }
3184 }
3185 #endif
3186
3187 /* ---------------------------------------------------------------------------
3188    Interrupt execution
3189    - usually called inside a signal handler so it mustn't do anything fancy.   
3190    ------------------------------------------------------------------------ */
3191
3192 void
3193 interruptStgRts(void)
3194 {
3195     interrupted    = 1;
3196     context_switch = 1;
3197 #if defined(THREADED_RTS)
3198     prodAllCapabilities();
3199 #endif
3200 }
3201
3202 /* -----------------------------------------------------------------------------
3203    Unblock a thread
3204
3205    This is for use when we raise an exception in another thread, which
3206    may be blocked.
3207    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3208    -------------------------------------------------------------------------- */
3209
3210 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3211 /*
3212   NB: only the type of the blocking queue is different in GranSim and GUM
3213       the operations on the queue-elements are the same
3214       long live polymorphism!
3215
3216   Locks: sched_mutex is held upon entry and exit.
3217
3218 */
3219 static void
3220 unblockThread(Capability *cap, StgTSO *tso)
3221 {
3222   StgBlockingQueueElement *t, **last;
3223
3224   switch (tso->why_blocked) {
3225
3226   case NotBlocked:
3227     return;  /* not blocked */
3228
3229   case BlockedOnSTM:
3230     // Be careful: nothing to do here!  We tell the scheduler that the thread
3231     // is runnable and we leave it to the stack-walking code to abort the 
3232     // transaction while unwinding the stack.  We should perhaps have a debugging
3233     // test to make sure that this really happens and that the 'zombie' transaction
3234     // does not get committed.
3235     goto done;
3236
3237   case BlockedOnMVar:
3238     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3239     {
3240       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3241       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3242
3243       last = (StgBlockingQueueElement **)&mvar->head;
3244       for (t = (StgBlockingQueueElement *)mvar->head; 
3245            t != END_BQ_QUEUE; 
3246            last = &t->link, last_tso = t, t = t->link) {
3247         if (t == (StgBlockingQueueElement *)tso) {
3248           *last = (StgBlockingQueueElement *)tso->link;
3249           if (mvar->tail == tso) {
3250             mvar->tail = (StgTSO *)last_tso;
3251           }
3252           goto done;
3253         }
3254       }
3255       barf("unblockThread (MVAR): TSO not found");
3256     }
3257
3258   case BlockedOnBlackHole:
3259     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3260     {
3261       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3262
3263       last = &bq->blocking_queue;
3264       for (t = bq->blocking_queue; 
3265            t != END_BQ_QUEUE; 
3266            last = &t->link, t = t->link) {
3267         if (t == (StgBlockingQueueElement *)tso) {
3268           *last = (StgBlockingQueueElement *)tso->link;
3269           goto done;
3270         }
3271       }
3272       barf("unblockThread (BLACKHOLE): TSO not found");
3273     }
3274
3275   case BlockedOnException:
3276     {
3277       StgTSO *target  = tso->block_info.tso;
3278
3279       ASSERT(get_itbl(target)->type == TSO);
3280
3281       if (target->what_next == ThreadRelocated) {
3282           target = target->link;
3283           ASSERT(get_itbl(target)->type == TSO);
3284       }
3285
3286       ASSERT(target->blocked_exceptions != NULL);
3287
3288       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3289       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3290            t != END_BQ_QUEUE; 
3291            last = &t->link, t = t->link) {
3292         ASSERT(get_itbl(t)->type == TSO);
3293         if (t == (StgBlockingQueueElement *)tso) {
3294           *last = (StgBlockingQueueElement *)tso->link;
3295           goto done;
3296         }
3297       }
3298       barf("unblockThread (Exception): TSO not found");
3299     }
3300
3301   case BlockedOnRead:
3302   case BlockedOnWrite:
3303 #if defined(mingw32_HOST_OS)
3304   case BlockedOnDoProc:
3305 #endif
3306     {
3307       /* take TSO off blocked_queue */
3308       StgBlockingQueueElement *prev = NULL;
3309       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3310            prev = t, t = t->link) {
3311         if (t == (StgBlockingQueueElement *)tso) {
3312           if (prev == NULL) {
3313             blocked_queue_hd = (StgTSO *)t->link;
3314             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3315               blocked_queue_tl = END_TSO_QUEUE;
3316             }
3317           } else {
3318             prev->link = t->link;
3319             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3320               blocked_queue_tl = (StgTSO *)prev;
3321             }
3322           }
3323 #if defined(mingw32_HOST_OS)
3324           /* (Cooperatively) signal that the worker thread should abort
3325            * the request.
3326            */
3327           abandonWorkRequest(tso->block_info.async_result->reqID);
3328 #endif
3329           goto done;
3330         }
3331       }
3332       barf("unblockThread (I/O): TSO not found");
3333     }
3334
3335   case BlockedOnDelay:
3336     {
3337       /* take TSO off sleeping_queue */
3338       StgBlockingQueueElement *prev = NULL;
3339       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3340            prev = t, t = t->link) {
3341         if (t == (StgBlockingQueueElement *)tso) {
3342           if (prev == NULL) {
3343             sleeping_queue = (StgTSO *)t->link;
3344           } else {
3345             prev->link = t->link;
3346           }
3347           goto done;
3348         }
3349       }
3350       barf("unblockThread (delay): TSO not found");
3351     }
3352
3353   default:
3354     barf("unblockThread");
3355   }
3356
3357  done:
3358   tso->link = END_TSO_QUEUE;
3359   tso->why_blocked = NotBlocked;
3360   tso->block_info.closure = NULL;
3361   pushOnRunQueue(cap,tso);
3362 }
3363 #else
3364 static void
3365 unblockThread(Capability *cap, StgTSO *tso)
3366 {
3367   StgTSO *t, **last;
3368   
3369   /* To avoid locking unnecessarily. */
3370   if (tso->why_blocked == NotBlocked) {
3371     return;
3372   }
3373
3374   switch (tso->why_blocked) {
3375
3376   case BlockedOnSTM:
3377     // Be careful: nothing to do here!  We tell the scheduler that the thread
3378     // is runnable and we leave it to the stack-walking code to abort the 
3379     // transaction while unwinding the stack.  We should perhaps have a debugging
3380     // test to make sure that this really happens and that the 'zombie' transaction
3381     // does not get committed.
3382     goto done;
3383
3384   case BlockedOnMVar:
3385     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3386     {
3387       StgTSO *last_tso = END_TSO_QUEUE;
3388       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3389
3390       last = &mvar->head;
3391       for (t = mvar->head; t != END_TSO_QUEUE; 
3392            last = &t->link, last_tso = t, t = t->link) {
3393         if (t == tso) {
3394           *last = tso->link;
3395           if (mvar->tail == tso) {
3396             mvar->tail = last_tso;
3397           }
3398           goto done;
3399         }
3400       }
3401       barf("unblockThread (MVAR): TSO not found");
3402     }
3403
3404   case BlockedOnBlackHole:
3405     {
3406       last = &blackhole_queue;
3407       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3408            last = &t->link, t = t->link) {
3409         if (t == tso) {
3410           *last = tso->link;
3411           goto done;
3412         }
3413       }
3414       barf("unblockThread (BLACKHOLE): TSO not found");
3415     }
3416
3417   case BlockedOnException:
3418     {
3419       StgTSO *target  = tso->block_info.tso;
3420
3421       ASSERT(get_itbl(target)->type == TSO);
3422
3423       while (target->what_next == ThreadRelocated) {
3424           target = target->link;
3425           ASSERT(get_itbl(target)->type == TSO);
3426       }
3427       
3428       ASSERT(target->blocked_exceptions != NULL);
3429
3430       last = &target->blocked_exceptions;
3431       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3432            last = &t->link, t = t->link) {
3433         ASSERT(get_itbl(t)->type == TSO);
3434         if (t == tso) {
3435           *last = tso->link;
3436           goto done;
3437         }
3438       }
3439       barf("unblockThread (Exception): TSO not found");
3440     }
3441
3442 #if !defined(THREADED_RTS)
3443   case BlockedOnRead:
3444   case BlockedOnWrite:
3445 #if defined(mingw32_HOST_OS)
3446   case BlockedOnDoProc:
3447 #endif
3448     {
3449       StgTSO *prev = NULL;
3450       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3451            prev = t, t = t->link) {
3452         if (t == tso) {
3453           if (prev == NULL) {
3454             blocked_queue_hd = t->link;
3455             if (blocked_queue_tl == t) {
3456               blocked_queue_tl = END_TSO_QUEUE;
3457             }
3458           } else {
3459             prev->link = t->link;
3460             if (blocked_queue_tl == t) {
3461               blocked_queue_tl = prev;
3462             }
3463           }
3464 #if defined(mingw32_HOST_OS)
3465           /* (Cooperatively) signal that the worker thread should abort
3466            * the request.
3467            */
3468           abandonWorkRequest(tso->block_info.async_result->reqID);
3469 #endif
3470           goto done;
3471         }
3472       }
3473       barf("unblockThread (I/O): TSO not found");
3474     }
3475
3476   case BlockedOnDelay:
3477     {
3478       StgTSO *prev = NULL;
3479       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3480            prev = t, t = t->link) {
3481         if (t == tso) {
3482           if (prev == NULL) {
3483             sleeping_queue = t->link;
3484           } else {
3485             prev->link = t->link;
3486           }
3487           goto done;
3488         }
3489       }
3490       barf("unblockThread (delay): TSO not found");
3491     }
3492 #endif
3493
3494   default:
3495     barf("unblockThread");
3496   }
3497
3498  done:
3499   tso->link = END_TSO_QUEUE;
3500   tso->why_blocked = NotBlocked;
3501   tso->block_info.closure = NULL;
3502   appendToRunQueue(cap,tso);
3503 }
3504 #endif
3505
3506 /* -----------------------------------------------------------------------------
3507  * checkBlackHoles()
3508  *
3509  * Check the blackhole_queue for threads that can be woken up.  We do
3510  * this periodically: before every GC, and whenever the run queue is
3511  * empty.
3512  *
3513  * An elegant solution might be to just wake up all the blocked
3514  * threads with awakenBlockedQueue occasionally: they'll go back to
3515  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3516  * doesn't give us a way to tell whether we've actually managed to
3517  * wake up any threads, so we would be busy-waiting.
3518  *
3519  * -------------------------------------------------------------------------- */
3520
3521 static rtsBool
3522 checkBlackHoles (Capability *cap)
3523 {
3524     StgTSO **prev, *t;
3525     rtsBool any_woke_up = rtsFalse;
3526     StgHalfWord type;
3527
3528     // blackhole_queue is global:
3529     ASSERT_LOCK_HELD(&sched_mutex);
3530
3531     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3532
3533     // ASSUMES: sched_mutex
3534     prev = &blackhole_queue;
3535     t = blackhole_queue;
3536     while (t != END_TSO_QUEUE) {
3537         ASSERT(t->why_blocked == BlockedOnBlackHole);
3538         type = get_itbl(t->block_info.closure)->type;
3539         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3540             IF_DEBUG(sanity,checkTSO(t));
3541             t = unblockOne(cap, t);
3542             // urk, the threads migrate to the current capability
3543             // here, but we'd like to keep them on the original one.
3544             *prev = t;
3545             any_woke_up = rtsTrue;
3546         } else {
3547             prev = &t->link;
3548             t = t->link;
3549         }
3550     }
3551
3552     return any_woke_up;
3553 }
3554
3555 /* -----------------------------------------------------------------------------
3556  * raiseAsync()
3557  *
3558  * The following function implements the magic for raising an
3559  * asynchronous exception in an existing thread.
3560  *
3561  * We first remove the thread from any queue on which it might be
3562  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3563  *
3564  * We strip the stack down to the innermost CATCH_FRAME, building
3565  * thunks in the heap for all the active computations, so they can 
3566  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3567  * an application of the handler to the exception, and push it on
3568  * the top of the stack.
3569  * 
3570  * How exactly do we save all the active computations?  We create an
3571  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3572  * AP_STACKs pushes everything from the corresponding update frame
3573  * upwards onto the stack.  (Actually, it pushes everything up to the
3574  * next update frame plus a pointer to the next AP_STACK object.
3575  * Entering the next AP_STACK object pushes more onto the stack until we
3576  * reach the last AP_STACK object - at which point the stack should look
3577  * exactly as it did when we killed the TSO and we can continue
3578  * execution by entering the closure on top of the stack.
3579  *
3580  * We can also kill a thread entirely - this happens if either (a) the 
3581  * exception passed to raiseAsync is NULL, or (b) there's no
3582  * CATCH_FRAME on the stack.  In either case, we strip the entire
3583  * stack and replace the thread with a zombie.
3584  *
3585  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3586  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3587  * the TSO is currently blocked on or on the run queue of.
3588  *
3589  * -------------------------------------------------------------------------- */
3590  
3591 void
3592 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3593 {
3594     raiseAsync_(cap, tso, exception, rtsFalse);
3595 }
3596
3597 static void
3598 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3599             rtsBool stop_at_atomically)
3600 {
3601     StgRetInfoTable *info;
3602     StgPtr sp;
3603   
3604     // Thread already dead?
3605     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3606         return;
3607     }
3608
3609     IF_DEBUG(scheduler, 
3610              sched_belch("raising exception in thread %ld.", (long)tso->id));
3611     
3612     // Remove it from any blocking queues
3613     unblockThread(cap,tso);
3614
3615     sp = tso->sp;
3616     
3617     // The stack freezing code assumes there's a closure pointer on
3618     // the top of the stack, so we have to arrange that this is the case...
3619     //
3620     if (sp[0] == (W_)&stg_enter_info) {
3621         sp++;
3622     } else {
3623         sp--;
3624         sp[0] = (W_)&stg_dummy_ret_closure;
3625     }
3626
3627     while (1) {
3628         nat i;
3629
3630         // 1. Let the top of the stack be the "current closure"
3631         //
3632         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3633         // CATCH_FRAME.
3634         //
3635         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3636         // current closure applied to the chunk of stack up to (but not
3637         // including) the update frame.  This closure becomes the "current
3638         // closure".  Go back to step 2.
3639         //
3640         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3641         // top of the stack applied to the exception.
3642         // 
3643         // 5. If it's a STOP_FRAME, then kill the thread.
3644         // 
3645         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3646         // transaction
3647        
3648         
3649         StgPtr frame;
3650         
3651         frame = sp + 1;
3652         info = get_ret_itbl((StgClosure *)frame);
3653         
3654         while (info->i.type != UPDATE_FRAME
3655                && (info->i.type != CATCH_FRAME || exception == NULL)
3656                && info->i.type != STOP_FRAME
3657                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3658         {
3659             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3660               // IF we find an ATOMICALLY_FRAME then we abort the
3661               // current transaction and propagate the exception.  In
3662               // this case (unlike ordinary exceptions) we do not care
3663               // whether the transaction is valid or not because its
3664               // possible validity cannot have caused the exception
3665               // and will not be visible after the abort.
3666               IF_DEBUG(stm,
3667                        debugBelch("Found atomically block delivering async exception\n"));
3668               stmAbortTransaction(tso -> trec);
3669               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3670             }
3671             frame += stack_frame_sizeW((StgClosure *)frame);
3672             info = get_ret_itbl((StgClosure *)frame);
3673         }
3674         
3675         switch (info->i.type) {
3676             
3677         case ATOMICALLY_FRAME:
3678             ASSERT(stop_at_atomically);
3679             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3680             stmCondemnTransaction(tso -> trec);
3681 #ifdef REG_R1
3682             tso->sp = frame;
3683 #else
3684             // R1 is not a register: the return convention for IO in
3685             // this case puts the return value on the stack, so we
3686             // need to set up the stack to return to the atomically
3687             // frame properly...
3688             tso->sp = frame - 2;
3689             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3690             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3691 #endif
3692             tso->what_next = ThreadRunGHC;
3693             return;
3694
3695         case CATCH_FRAME:
3696             // If we find a CATCH_FRAME, and we've got an exception to raise,
3697             // then build the THUNK raise(exception), and leave it on
3698             // top of the CATCH_FRAME ready to enter.
3699             //
3700         {
3701 #ifdef PROFILING
3702             StgCatchFrame *cf = (StgCatchFrame *)frame;
3703 #endif
3704             StgThunk *raise;
3705             
3706             // we've got an exception to raise, so let's pass it to the
3707             // handler in this frame.
3708             //
3709             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3710             TICK_ALLOC_SE_THK(1,0);
3711             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3712             raise->payload[0] = exception;
3713             
3714             // throw away the stack from Sp up to the CATCH_FRAME.
3715             //
3716             sp = frame - 1;
3717             
3718             /* Ensure that async excpetions are blocked now, so we don't get
3719              * a surprise exception before we get around to executing the
3720              * handler.
3721              */
3722             if (tso->blocked_exceptions == NULL) {
3723                 tso->blocked_exceptions = END_TSO_QUEUE;
3724             }
3725             
3726             /* Put the newly-built THUNK on top of the stack, ready to execute
3727              * when the thread restarts.
3728              */
3729             sp[0] = (W_)raise;
3730             sp[-1] = (W_)&stg_enter_info;
3731             tso->sp = sp-1;
3732             tso->what_next = ThreadRunGHC;
3733             IF_DEBUG(sanity, checkTSO(tso));
3734             return;
3735         }
3736         
3737         case UPDATE_FRAME:
3738         {
3739             StgAP_STACK * ap;
3740             nat words;
3741             
3742             // First build an AP_STACK consisting of the stack chunk above the
3743             // current update frame, with the top word on the stack as the
3744             // fun field.
3745             //
3746             words = frame - sp - 1;
3747             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3748             
3749             ap->size = words;
3750             ap->fun  = (StgClosure *)sp[0];
3751             sp++;
3752             for(i=0; i < (nat)words; ++i) {
3753                 ap->payload[i] = (StgClosure *)*sp++;
3754             }
3755             
3756             SET_HDR(ap,&stg_AP_STACK_info,
3757                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3758             TICK_ALLOC_UP_THK(words+1,0);
3759             
3760             IF_DEBUG(scheduler,
3761                      debugBelch("sched: Updating ");
3762                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3763                      debugBelch(" with ");
3764                      printObj((StgClosure *)ap);
3765                 );
3766
3767             // Replace the updatee with an indirection - happily
3768             // this will also wake up any threads currently
3769             // waiting on the result.
3770             //
3771             // Warning: if we're in a loop, more than one update frame on
3772             // the stack may point to the same object.  Be careful not to
3773             // overwrite an IND_OLDGEN in this case, because we'll screw
3774             // up the mutable lists.  To be on the safe side, don't
3775             // overwrite any kind of indirection at all.  See also
3776             // threadSqueezeStack in GC.c, where we have to make a similar
3777             // check.
3778             //
3779             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3780                 // revert the black hole
3781                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3782                                (StgClosure *)ap);
3783             }
3784             sp += sizeofW(StgUpdateFrame) - 1;
3785             sp[0] = (W_)ap; // push onto stack
3786             break;
3787         }
3788         
3789         case STOP_FRAME:
3790             // We've stripped the entire stack, the thread is now dead.
3791             sp += sizeofW(StgStopFrame);
3792             tso->what_next = ThreadKilled;
3793             tso->sp = sp;
3794             return;
3795             
3796         default:
3797             barf("raiseAsync");
3798         }
3799     }
3800     barf("raiseAsync");
3801 }
3802
3803 /* -----------------------------------------------------------------------------
3804    Deleting threads
3805
3806    This is used for interruption (^C) and forking, and corresponds to
3807    raising an exception but without letting the thread catch the
3808    exception.
3809    -------------------------------------------------------------------------- */
3810
3811 static void 
3812 deleteThread (Capability *cap, StgTSO *tso)
3813 {
3814   if (tso->why_blocked != BlockedOnCCall &&
3815       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3816       raiseAsync(cap,tso,NULL);
3817   }
3818 }
3819
3820 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3821 static void 
3822 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3823 { // for forkProcess only:
3824   // delete thread without giving it a chance to catch the KillThread exception
3825
3826   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3827       return;
3828   }
3829
3830   if (tso->why_blocked != BlockedOnCCall &&
3831       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3832       unblockThread(cap,tso);
3833   }
3834
3835   tso->what_next = ThreadKilled;
3836 }
3837 #endif
3838
3839 /* -----------------------------------------------------------------------------
3840    raiseExceptionHelper
3841    
3842    This function is called by the raise# primitve, just so that we can
3843    move some of the tricky bits of raising an exception from C-- into
3844    C.  Who knows, it might be a useful re-useable thing here too.
3845    -------------------------------------------------------------------------- */
3846
3847 StgWord
3848 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3849 {
3850     Capability *cap = regTableToCapability(reg);
3851     StgThunk *raise_closure = NULL;
3852     StgPtr p, next;
3853     StgRetInfoTable *info;
3854     //
3855     // This closure represents the expression 'raise# E' where E
3856     // is the exception raise.  It is used to overwrite all the
3857     // thunks which are currently under evaluataion.
3858     //
3859
3860     //    
3861     // LDV profiling: stg_raise_info has THUNK as its closure
3862     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3863     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3864     // 1 does not cause any problem unless profiling is performed.
3865     // However, when LDV profiling goes on, we need to linearly scan
3866     // small object pool, where raise_closure is stored, so we should
3867     // use MIN_UPD_SIZE.
3868     //
3869     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3870     //                                 sizeofW(StgClosure)+1);
3871     //
3872
3873     //
3874     // Walk up the stack, looking for the catch frame.  On the way,
3875     // we update any closures pointed to from update frames with the
3876     // raise closure that we just built.
3877     //
3878     p = tso->sp;
3879     while(1) {
3880         info = get_ret_itbl((StgClosure *)p);
3881         next = p + stack_frame_sizeW((StgClosure *)p);
3882         switch (info->i.type) {
3883             
3884         case UPDATE_FRAME:
3885             // Only create raise_closure if we need to.
3886             if (raise_closure == NULL) {
3887                 raise_closure = 
3888                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3889                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3890                 raise_closure->payload[0] = exception;
3891             }
3892             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3893             p = next;
3894             continue;
3895
3896         case ATOMICALLY_FRAME:
3897             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3898             tso->sp = p;
3899             return ATOMICALLY_FRAME;
3900             
3901         case CATCH_FRAME:
3902             tso->sp = p;
3903             return CATCH_FRAME;
3904
3905         case CATCH_STM_FRAME:
3906             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3907             tso->sp = p;
3908             return CATCH_STM_FRAME;
3909             
3910         case STOP_FRAME:
3911             tso->sp = p;
3912             return STOP_FRAME;
3913
3914         case CATCH_RETRY_FRAME:
3915         default:
3916             p = next; 
3917             continue;
3918         }
3919     }
3920 }
3921
3922
3923 /* -----------------------------------------------------------------------------
3924    findRetryFrameHelper
3925
3926    This function is called by the retry# primitive.  It traverses the stack
3927    leaving tso->sp referring to the frame which should handle the retry.  
3928
3929    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3930    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3931
3932    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3933    despite the similar implementation.
3934
3935    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3936    not be created within memory transactions.
3937    -------------------------------------------------------------------------- */
3938
3939 StgWord
3940 findRetryFrameHelper (StgTSO *tso)
3941 {
3942   StgPtr           p, next;
3943   StgRetInfoTable *info;
3944
3945   p = tso -> sp;
3946   while (1) {
3947     info = get_ret_itbl((StgClosure *)p);
3948     next = p + stack_frame_sizeW((StgClosure *)p);
3949     switch (info->i.type) {
3950       
3951     case ATOMICALLY_FRAME:
3952       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3953       tso->sp = p;
3954       return ATOMICALLY_FRAME;
3955       
3956     case CATCH_RETRY_FRAME:
3957       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3958       tso->sp = p;
3959       return CATCH_RETRY_FRAME;
3960       
3961     case CATCH_STM_FRAME:
3962     default:
3963       ASSERT(info->i.type != CATCH_FRAME);
3964       ASSERT(info->i.type != STOP_FRAME);
3965       p = next; 
3966       continue;
3967     }
3968   }
3969 }
3970
3971 /* -----------------------------------------------------------------------------
3972    resurrectThreads is called after garbage collection on the list of
3973    threads found to be garbage.  Each of these threads will be woken
3974    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3975    on an MVar, or NonTermination if the thread was blocked on a Black
3976    Hole.
3977
3978    Locks: assumes we hold *all* the capabilities.
3979    -------------------------------------------------------------------------- */
3980
3981 void
3982 resurrectThreads (StgTSO *threads)
3983 {
3984     StgTSO *tso, *next;
3985     Capability *cap;
3986
3987     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3988         next = tso->global_link;
3989         tso->global_link = all_threads;
3990         all_threads = tso;
3991         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3992         
3993         // Wake up the thread on the Capability it was last on for a
3994         // bound thread, or last_free_capability otherwise.
3995         if (tso->bound) {
3996             cap = tso->bound->cap;
3997         } else {
3998             cap = last_free_capability;
3999         }
4000         
4001         switch (tso->why_blocked) {
4002         case BlockedOnMVar:
4003         case BlockedOnException:
4004             /* Called by GC - sched_mutex lock is currently held. */
4005             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4006             break;
4007         case BlockedOnBlackHole:
4008             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4009             break;
4010         case BlockedOnSTM:
4011             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4012             break;
4013         case NotBlocked:
4014             /* This might happen if the thread was blocked on a black hole
4015              * belonging to a thread that we've just woken up (raiseAsync
4016              * can wake up threads, remember...).
4017              */
4018             continue;
4019         default:
4020             barf("resurrectThreads: thread blocked in a strange way");
4021         }
4022     }
4023 }
4024
4025 /* ----------------------------------------------------------------------------
4026  * Debugging: why is a thread blocked
4027  * [Also provides useful information when debugging threaded programs
4028  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4029    ------------------------------------------------------------------------- */
4030
4031 #if DEBUG
4032 static void
4033 printThreadBlockage(StgTSO *tso)
4034 {
4035   switch (tso->why_blocked) {
4036   case BlockedOnRead:
4037     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4038     break;
4039   case BlockedOnWrite:
4040     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4041     break;
4042 #if defined(mingw32_HOST_OS)
4043     case BlockedOnDoProc:
4044     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4045     break;
4046 #endif
4047   case BlockedOnDelay:
4048     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4049     break;
4050   case BlockedOnMVar:
4051     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4052     break;
4053   case BlockedOnException:
4054     debugBelch("is blocked on delivering an exception to thread %d",
4055             tso->block_info.tso->id);
4056     break;
4057   case BlockedOnBlackHole:
4058     debugBelch("is blocked on a black hole");
4059     break;
4060   case NotBlocked:
4061     debugBelch("is not blocked");
4062     break;
4063 #if defined(PARALLEL_HASKELL)
4064   case BlockedOnGA:
4065     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4066             tso->block_info.closure, info_type(tso->block_info.closure));
4067     break;
4068   case BlockedOnGA_NoSend:
4069     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4070             tso->block_info.closure, info_type(tso->block_info.closure));
4071     break;
4072 #endif
4073   case BlockedOnCCall:
4074     debugBelch("is blocked on an external call");
4075     break;
4076   case BlockedOnCCall_NoUnblockExc:
4077     debugBelch("is blocked on an external call (exceptions were already blocked)");
4078     break;
4079   case BlockedOnSTM:
4080     debugBelch("is blocked on an STM operation");
4081     break;
4082   default:
4083     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4084          tso->why_blocked, tso->id, tso);
4085   }
4086 }
4087
4088 void
4089 printThreadStatus(StgTSO *t)
4090 {
4091     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4092     {
4093       void *label = lookupThreadLabel(t->id);
4094       if (label) debugBelch("[\"%s\"] ",(char *)label);
4095     }
4096     if (t->what_next == ThreadRelocated) {
4097         debugBelch("has been relocated...\n");
4098     } else {
4099         switch (t->what_next) {
4100         case ThreadKilled:
4101             debugBelch("has been killed");
4102             break;
4103         case ThreadComplete:
4104             debugBelch("has completed");
4105             break;
4106         default:
4107             printThreadBlockage(t);
4108         }
4109         debugBelch("\n");
4110     }
4111 }
4112
4113 void
4114 printAllThreads(void)
4115 {
4116   StgTSO *t, *next;
4117   nat i;
4118   Capability *cap;
4119
4120 # if defined(GRAN)
4121   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4122   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4123                        time_string, rtsFalse/*no commas!*/);
4124
4125   debugBelch("all threads at [%s]:\n", time_string);
4126 # elif defined(PARALLEL_HASKELL)
4127   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4128   ullong_format_string(CURRENT_TIME,
4129                        time_string, rtsFalse/*no commas!*/);
4130
4131   debugBelch("all threads at [%s]:\n", time_string);
4132 # else
4133   debugBelch("all threads:\n");
4134 # endif
4135
4136   for (i = 0; i < n_capabilities; i++) {
4137       cap = &capabilities[i];
4138       debugBelch("threads on capability %d:\n", cap->no);
4139       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4140           printThreadStatus(t);
4141       }
4142   }
4143
4144   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4145       if (t->why_blocked != NotBlocked) {
4146           printThreadStatus(t);
4147       }
4148       if (t->what_next == ThreadRelocated) {
4149           next = t->link;
4150       } else {
4151           next = t->global_link;
4152       }
4153   }
4154 }
4155
4156 // useful from gdb
4157 void 
4158 printThreadQueue(StgTSO *t)
4159 {
4160     nat i = 0;
4161     for (; t != END_TSO_QUEUE; t = t->link) {
4162         printThreadStatus(t);
4163         i++;
4164     }
4165     debugBelch("%d threads on queue\n", i);
4166 }
4167
4168 /* 
4169    Print a whole blocking queue attached to node (debugging only).
4170 */
4171 # if defined(PARALLEL_HASKELL)
4172 void 
4173 print_bq (StgClosure *node)
4174 {
4175   StgBlockingQueueElement *bqe;
4176   StgTSO *tso;
4177   rtsBool end;
4178
4179   debugBelch("## BQ of closure %p (%s): ",
4180           node, info_type(node));
4181
4182   /* should cover all closures that may have a blocking queue */
4183   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4184          get_itbl(node)->type == FETCH_ME_BQ ||
4185          get_itbl(node)->type == RBH ||
4186          get_itbl(node)->type == MVAR);
4187     
4188   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4189
4190   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4191 }
4192
4193 /* 
4194    Print a whole blocking queue starting with the element bqe.
4195 */
4196 void 
4197 print_bqe (StgBlockingQueueElement *bqe)
4198 {
4199   rtsBool end;
4200
4201   /* 
4202      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4203   */
4204   for (end = (bqe==END_BQ_QUEUE);
4205        !end; // iterate until bqe points to a CONSTR
4206        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4207        bqe = end ? END_BQ_QUEUE : bqe->link) {
4208     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4209     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4210     /* types of closures that may appear in a blocking queue */
4211     ASSERT(get_itbl(bqe)->type == TSO ||           
4212            get_itbl(bqe)->type == BLOCKED_FETCH || 
4213            get_itbl(bqe)->type == CONSTR); 
4214     /* only BQs of an RBH end with an RBH_Save closure */
4215     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4216
4217     switch (get_itbl(bqe)->type) {
4218     case TSO:
4219       debugBelch(" TSO %u (%x),",
4220               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4221       break;
4222     case BLOCKED_FETCH:
4223       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4224               ((StgBlockedFetch *)bqe)->node, 
4225               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4226               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4227               ((StgBlockedFetch *)bqe)->ga.weight);
4228       break;
4229     case CONSTR:
4230       debugBelch(" %s (IP %p),",
4231               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4232                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4233                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4234                "RBH_Save_?"), get_itbl(bqe));
4235       break;
4236     default:
4237       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4238            info_type((StgClosure *)bqe)); // , node, info_type(node));
4239       break;
4240     }
4241   } /* for */
4242   debugBelch("\n");
4243 }
4244 # elif defined(GRAN)
4245 void 
4246 print_bq (StgClosure *node)
4247 {
4248   StgBlockingQueueElement *bqe;
4249   PEs node_loc, tso_loc;
4250   rtsBool end;
4251
4252   /* should cover all closures that may have a blocking queue */
4253   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4254          get_itbl(node)->type == FETCH_ME_BQ ||
4255          get_itbl(node)->type == RBH);
4256     
4257   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4258   node_loc = where_is(node);
4259
4260   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4261           node, info_type(node), node_loc);
4262
4263   /* 
4264      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4265   */
4266   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4267        !end; // iterate until bqe points to a CONSTR
4268        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4269     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4270     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4271     /* types of closures that may appear in a blocking queue */
4272     ASSERT(get_itbl(bqe)->type == TSO ||           
4273            get_itbl(bqe)->type == CONSTR); 
4274     /* only BQs of an RBH end with an RBH_Save closure */
4275     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4276
4277     tso_loc = where_is((StgClosure *)bqe);
4278     switch (get_itbl(bqe)->type) {
4279     case TSO:
4280       debugBelch(" TSO %d (%p) on [PE %d],",
4281               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4282       break;
4283     case CONSTR:
4284       debugBelch(" %s (IP %p),",
4285               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4286                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4287                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4288                "RBH_Save_?"), get_itbl(bqe));
4289       break;
4290     default:
4291       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4292            info_type((StgClosure *)bqe), node, info_type(node));
4293       break;
4294     }
4295   } /* for */
4296   debugBelch("\n");
4297 }
4298 # endif
4299
4300 #if defined(PARALLEL_HASKELL)
4301 static nat
4302 run_queue_len(void)
4303 {
4304     nat i;
4305     StgTSO *tso;
4306     
4307     for (i=0, tso=run_queue_hd; 
4308          tso != END_TSO_QUEUE;
4309          i++, tso=tso->link) {
4310         /* nothing */
4311     }
4312         
4313     return i;
4314 }
4315 #endif
4316
4317 void
4318 sched_belch(char *s, ...)
4319 {
4320     va_list ap;
4321     va_start(ap,s);
4322 #ifdef THREADED_RTS
4323     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4324 #elif defined(PARALLEL_HASKELL)
4325     debugBelch("== ");
4326 #else
4327     debugBelch("sched: ");
4328 #endif
4329     vdebugBelch(s, ap);
4330     debugBelch("\n");
4331     va_end(ap);
4332 }
4333
4334 #endif /* DEBUG */