c4b253f73591046fa515fb9ff0e14044eb6a1f02
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically, StgPtr stop_here);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418 #if defined(SMP)
419         discardSparksCap(cap);
420 #endif
421         if (shutting_down_scheduler) {
422             IF_DEBUG(scheduler, sched_belch("shutting down"));
423             // If we are a worker, just exit.  If we're a bound thread
424             // then we will exit below when we've removed our TSO from
425             // the run queue.
426             if (task->tso == NULL && emptyRunQueue(cap)) {
427                 return cap;
428             }
429         } else {
430             IF_DEBUG(scheduler, sched_belch("interrupted"));
431         }
432     }
433
434 #if defined(SMP)
435     // If the run queue is empty, take a spark and turn it into a thread.
436     {
437         if (emptyRunQueue(cap)) {
438             StgClosure *spark;
439             spark = findSpark(cap);
440             if (spark != NULL) {
441                 IF_DEBUG(scheduler,
442                          sched_belch("turning spark of closure %p into a thread",
443                                      (StgClosure *)spark));
444                 createSparkThread(cap,spark);     
445             }
446         }
447     }
448 #endif // SMP
449
450     scheduleStartSignalHandlers(cap);
451
452     // Only check the black holes here if we've nothing else to do.
453     // During normal execution, the black hole list only gets checked
454     // at GC time, to avoid repeatedly traversing this possibly long
455     // list each time around the scheduler.
456     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
457
458     scheduleCheckBlockedThreads(cap);
459
460     scheduleDetectDeadlock(cap,task);
461
462     // Normally, the only way we can get here with no threads to
463     // run is if a keyboard interrupt received during 
464     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465     // Additionally, it is not fatal for the
466     // threaded RTS to reach here with no threads to run.
467     //
468     // win32: might be here due to awaitEvent() being abandoned
469     // as a result of a console event having been delivered.
470     if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
472         ASSERT(interrupted);
473 #endif
474         continue; // nothing to do
475     }
476
477 #if defined(PARALLEL_HASKELL)
478     scheduleSendPendingMessages();
479     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
480         continue;
481
482 #if defined(SPARKS)
483     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
484 #endif
485
486     /* If we still have no work we need to send a FISH to get a spark
487        from another PE */
488     if (emptyRunQueue(cap)) {
489         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490         ASSERT(rtsFalse); // should not happen at the moment
491     }
492     // from here: non-empty run queue.
493     //  TODO: merge above case with this, only one call processMessages() !
494     if (PacketsWaiting()) {  /* process incoming messages, if
495                                 any pending...  only in else
496                                 because getRemoteWork waits for
497                                 messages as well */
498         receivedFinish = processMessages();
499     }
500 #endif
501
502 #if defined(GRAN)
503     scheduleProcessEvent(event);
504 #endif
505
506     // 
507     // Get a thread to run
508     //
509     t = popRunQueue(cap);
510
511 #if defined(GRAN) || defined(PAR)
512     scheduleGranParReport(); // some kind of debuging output
513 #else
514     // Sanity check the thread we're about to run.  This can be
515     // expensive if there is lots of thread switching going on...
516     IF_DEBUG(sanity,checkTSO(t));
517 #endif
518
519 #if defined(THREADED_RTS)
520     // Check whether we can run this thread in the current task.
521     // If not, we have to pass our capability to the right task.
522     {
523         Task *bound = t->bound;
524       
525         if (bound) {
526             if (bound == task) {
527                 IF_DEBUG(scheduler,
528                          sched_belch("### Running thread %d in bound thread",
529                                      t->id));
530                 // yes, the Haskell thread is bound to the current native thread
531             } else {
532                 IF_DEBUG(scheduler,
533                          sched_belch("### thread %d bound to another OS thread",
534                                      t->id));
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 IF_DEBUG(scheduler,
543                          sched_belch("### this OS thread cannot run thread %d", t->id));
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]));
568
569 #if defined(PROFILING)
570     startHeapProfTimer();
571 #endif
572
573     // ----------------------------------------------------------------------
574     // Run the current thread 
575
576     prev_what_next = t->what_next;
577
578     errno = t->saved_errno;
579     cap->in_haskell = rtsTrue;
580
581     recent_activity = ACTIVITY_YES;
582
583     switch (prev_what_next) {
584         
585     case ThreadKilled:
586     case ThreadComplete:
587         /* Thread already finished, return to scheduler. */
588         ret = ThreadFinished;
589         break;
590         
591     case ThreadRunGHC:
592     {
593         StgRegTable *r;
594         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595         cap = regTableToCapability(r);
596         ret = r->rRet;
597         break;
598     }
599     
600     case ThreadInterpret:
601         cap = interpretBCO(cap);
602         ret = cap->r.rRet;
603         break;
604         
605     default:
606         barf("schedule: invalid what_next field");
607     }
608
609     cap->in_haskell = rtsFalse;
610
611     // The TSO might have moved, eg. if it re-entered the RTS and a GC
612     // happened.  So find the new location:
613     t = cap->r.rCurrentTSO;
614
615     // We have run some Haskell code: there might be blackhole-blocked
616     // threads to wake up now.
617     // Lock-free test here should be ok, we're just setting a flag.
618     if ( blackhole_queue != END_TSO_QUEUE ) {
619         blackholes_need_checking = rtsTrue;
620     }
621     
622     // And save the current errno in this thread.
623     // XXX: possibly bogus for SMP because this thread might already
624     // be running again, see code below.
625     t->saved_errno = errno;
626
627 #ifdef SMP
628     // If ret is ThreadBlocked, and this Task is bound to the TSO that
629     // blocked, we are in limbo - the TSO is now owned by whatever it
630     // is blocked on, and may in fact already have been woken up,
631     // perhaps even on a different Capability.  It may be the case
632     // that task->cap != cap.  We better yield this Capability
633     // immediately and return to normaility.
634     if (ret == ThreadBlocked) {
635         IF_DEBUG(scheduler,
636                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
637                              t->id, whatNext_strs[t->what_next]));
638         continue;
639     }
640 #endif
641
642     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
643
644     // ----------------------------------------------------------------------
645     
646     // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
648     stopHeapProfTimer();
649     CCCS = CCS_SYSTEM;
650 #endif
651     
652 #if defined(THREADED_RTS)
653     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655     IF_DEBUG(scheduler,debugBelch("sched: "););
656 #endif
657     
658     schedulePostRunThread();
659
660     ready_to_gc = rtsFalse;
661
662     switch (ret) {
663     case HeapOverflow:
664         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
665         break;
666
667     case StackOverflow:
668         scheduleHandleStackOverflow(cap,task,t);
669         break;
670
671     case ThreadYielding:
672         if (scheduleHandleYield(cap, t, prev_what_next)) {
673             // shortcut for switching between compiler/interpreter:
674             goto run_thread; 
675         }
676         break;
677
678     case ThreadBlocked:
679         scheduleHandleThreadBlocked(t);
680         break;
681
682     case ThreadFinished:
683         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
685         break;
686
687     default:
688       barf("schedule: invalid thread return code %d", (int)ret);
689     }
690
691     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693   } /* end of while() */
694
695   IF_PAR_DEBUG(verbose,
696                debugBelch("== Leaving schedule() after having received Finish\n"));
697 }
698
699 /* ----------------------------------------------------------------------------
700  * Setting up the scheduler loop
701  * ------------------------------------------------------------------------- */
702
703 static void
704 schedulePreLoop(void)
705 {
706 #if defined(GRAN) 
707     /* set up first event to get things going */
708     /* ToDo: assign costs for system setup and init MainTSO ! */
709     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
710               ContinueThread, 
711               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
712     
713     IF_DEBUG(gran,
714              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
715                         CurrentTSO);
716              G_TSO(CurrentTSO, 5));
717     
718     if (RtsFlags.GranFlags.Light) {
719         /* Save current time; GranSim Light only */
720         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
721     }      
722 #endif
723 }
724
725 /* -----------------------------------------------------------------------------
726  * schedulePushWork()
727  *
728  * Push work to other Capabilities if we have some.
729  * -------------------------------------------------------------------------- */
730
731 #ifdef SMP
732 static void
733 schedulePushWork(Capability *cap USED_WHEN_SMP, 
734                  Task *task      USED_WHEN_SMP)
735 {
736     Capability *free_caps[n_capabilities], *cap0;
737     nat i, n_free_caps;
738
739     // Check whether we have more threads on our run queue, or sparks
740     // in our pool, that we could hand to another Capability.
741     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742         && sparkPoolSizeCap(cap) < 2) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         rtsBool pushed_to_all;
774
775         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
776
777         i = 0;
778         pushed_to_all = rtsFalse;
779
780         if (cap->run_queue_hd != END_TSO_QUEUE) {
781             prev = cap->run_queue_hd;
782             t = prev->link;
783             prev->link = END_TSO_QUEUE;
784             for (; t != END_TSO_QUEUE; t = next) {
785                 next = t->link;
786                 t->link = END_TSO_QUEUE;
787                 if (t->what_next == ThreadRelocated
788                     || t->bound == task) { // don't move my bound thread
789                     prev->link = t;
790                     prev = t;
791                 } else if (i == n_free_caps) {
792                     pushed_to_all = rtsTrue;
793                     i = 0;
794                     // keep one for us
795                     prev->link = t;
796                     prev = t;
797                 } else {
798                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
799                     appendToRunQueue(free_caps[i],t);
800                     if (t->bound) { t->bound->cap = free_caps[i]; }
801                     i++;
802                 }
803             }
804             cap->run_queue_tl = prev;
805         }
806
807         // If there are some free capabilities that we didn't push any
808         // threads to, then try to push a spark to each one.
809         if (!pushed_to_all) {
810             StgClosure *spark;
811             // i is the next free capability to push to
812             for (; i < n_free_caps; i++) {
813                 if (emptySparkPoolCap(free_caps[i])) {
814                     spark = findSpark(cap);
815                     if (spark != NULL) {
816                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
817                         newSpark(&(free_caps[i]->r), spark);
818                     }
819                 }
820             }
821         }
822
823         // release the capabilities
824         for (i = 0; i < n_free_caps; i++) {
825             task->cap = free_caps[i];
826             releaseCapability(free_caps[i]);
827         }
828     }
829     task->cap = cap; // reset to point to our Capability.
830 }
831 #endif
832
833 /* ----------------------------------------------------------------------------
834  * Start any pending signal handlers
835  * ------------------------------------------------------------------------- */
836
837 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
838 static void
839 scheduleStartSignalHandlers(Capability *cap)
840 {
841     if (signals_pending()) { // safe outside the lock
842         startSignalHandlers(cap);
843     }
844 }
845 #else
846 static void
847 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
848 {
849 }
850 #endif
851
852 /* ----------------------------------------------------------------------------
853  * Check for blocked threads that can be woken up.
854  * ------------------------------------------------------------------------- */
855
856 static void
857 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
858 {
859 #if !defined(THREADED_RTS)
860     //
861     // Check whether any waiting threads need to be woken up.  If the
862     // run queue is empty, and there are no other tasks running, we
863     // can wait indefinitely for something to happen.
864     //
865     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
866     {
867         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
868     }
869 #endif
870 }
871
872
873 /* ----------------------------------------------------------------------------
874  * Check for threads blocked on BLACKHOLEs that can be woken up
875  * ------------------------------------------------------------------------- */
876 static void
877 scheduleCheckBlackHoles (Capability *cap)
878 {
879     if ( blackholes_need_checking ) // check without the lock first
880     {
881         ACQUIRE_LOCK(&sched_mutex);
882         if ( blackholes_need_checking ) {
883             checkBlackHoles(cap);
884             blackholes_need_checking = rtsFalse;
885         }
886         RELEASE_LOCK(&sched_mutex);
887     }
888 }
889
890 /* ----------------------------------------------------------------------------
891  * Detect deadlock conditions and attempt to resolve them.
892  * ------------------------------------------------------------------------- */
893
894 static void
895 scheduleDetectDeadlock (Capability *cap, Task *task)
896 {
897
898 #if defined(PARALLEL_HASKELL)
899     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
900     return;
901 #endif
902
903     /* 
904      * Detect deadlock: when we have no threads to run, there are no
905      * threads blocked, waiting for I/O, or sleeping, and all the
906      * other tasks are waiting for work, we must have a deadlock of
907      * some description.
908      */
909     if ( emptyThreadQueues(cap) )
910     {
911 #if defined(THREADED_RTS)
912         /* 
913          * In the threaded RTS, we only check for deadlock if there
914          * has been no activity in a complete timeslice.  This means
915          * we won't eagerly start a full GC just because we don't have
916          * any threads to run currently.
917          */
918         if (recent_activity != ACTIVITY_INACTIVE) return;
919 #endif
920
921         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
922
923         // Garbage collection can release some new threads due to
924         // either (a) finalizers or (b) threads resurrected because
925         // they are unreachable and will therefore be sent an
926         // exception.  Any threads thus released will be immediately
927         // runnable.
928         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
929         recent_activity = ACTIVITY_DONE_GC;
930         
931         if ( !emptyRunQueue(cap) ) return;
932
933 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
934         /* If we have user-installed signal handlers, then wait
935          * for signals to arrive rather then bombing out with a
936          * deadlock.
937          */
938         if ( anyUserHandlers() ) {
939             IF_DEBUG(scheduler, 
940                      sched_belch("still deadlocked, waiting for signals..."));
941
942             awaitUserSignals();
943
944             if (signals_pending()) {
945                 startSignalHandlers(cap);
946             }
947
948             // either we have threads to run, or we were interrupted:
949             ASSERT(!emptyRunQueue(cap) || interrupted);
950         }
951 #endif
952
953 #if !defined(THREADED_RTS)
954         /* Probably a real deadlock.  Send the current main thread the
955          * Deadlock exception.
956          */
957         if (task->tso) {
958             switch (task->tso->why_blocked) {
959             case BlockedOnSTM:
960             case BlockedOnBlackHole:
961             case BlockedOnException:
962             case BlockedOnMVar:
963                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
964                 return;
965             default:
966                 barf("deadlock: main thread blocked in a strange way");
967             }
968         }
969         return;
970 #endif
971     }
972 }
973
974 /* ----------------------------------------------------------------------------
975  * Process an event (GRAN only)
976  * ------------------------------------------------------------------------- */
977
978 #if defined(GRAN)
979 static StgTSO *
980 scheduleProcessEvent(rtsEvent *event)
981 {
982     StgTSO *t;
983
984     if (RtsFlags.GranFlags.Light)
985       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
986
987     /* adjust time based on time-stamp */
988     if (event->time > CurrentTime[CurrentProc] &&
989         event->evttype != ContinueThread)
990       CurrentTime[CurrentProc] = event->time;
991     
992     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
993     if (!RtsFlags.GranFlags.Light)
994       handleIdlePEs();
995
996     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
997
998     /* main event dispatcher in GranSim */
999     switch (event->evttype) {
1000       /* Should just be continuing execution */
1001     case ContinueThread:
1002       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1003       /* ToDo: check assertion
1004       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1005              run_queue_hd != END_TSO_QUEUE);
1006       */
1007       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1008       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1009           procStatus[CurrentProc]==Fetching) {
1010         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1011               CurrentTSO->id, CurrentTSO, CurrentProc);
1012         goto next_thread;
1013       } 
1014       /* Ignore ContinueThreads for completed threads */
1015       if (CurrentTSO->what_next == ThreadComplete) {
1016         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1017               CurrentTSO->id, CurrentTSO, CurrentProc);
1018         goto next_thread;
1019       } 
1020       /* Ignore ContinueThreads for threads that are being migrated */
1021       if (PROCS(CurrentTSO)==Nowhere) { 
1022         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1023               CurrentTSO->id, CurrentTSO, CurrentProc);
1024         goto next_thread;
1025       }
1026       /* The thread should be at the beginning of the run queue */
1027       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1028         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1029               CurrentTSO->id, CurrentTSO, CurrentProc);
1030         break; // run the thread anyway
1031       }
1032       /*
1033       new_event(proc, proc, CurrentTime[proc],
1034                 FindWork,
1035                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1036       goto next_thread; 
1037       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1038       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1039
1040     case FetchNode:
1041       do_the_fetchnode(event);
1042       goto next_thread;             /* handle next event in event queue  */
1043       
1044     case GlobalBlock:
1045       do_the_globalblock(event);
1046       goto next_thread;             /* handle next event in event queue  */
1047       
1048     case FetchReply:
1049       do_the_fetchreply(event);
1050       goto next_thread;             /* handle next event in event queue  */
1051       
1052     case UnblockThread:   /* Move from the blocked queue to the tail of */
1053       do_the_unblock(event);
1054       goto next_thread;             /* handle next event in event queue  */
1055       
1056     case ResumeThread:  /* Move from the blocked queue to the tail of */
1057       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1058       event->tso->gran.blocktime += 
1059         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1060       do_the_startthread(event);
1061       goto next_thread;             /* handle next event in event queue  */
1062       
1063     case StartThread:
1064       do_the_startthread(event);
1065       goto next_thread;             /* handle next event in event queue  */
1066       
1067     case MoveThread:
1068       do_the_movethread(event);
1069       goto next_thread;             /* handle next event in event queue  */
1070       
1071     case MoveSpark:
1072       do_the_movespark(event);
1073       goto next_thread;             /* handle next event in event queue  */
1074       
1075     case FindWork:
1076       do_the_findwork(event);
1077       goto next_thread;             /* handle next event in event queue  */
1078       
1079     default:
1080       barf("Illegal event type %u\n", event->evttype);
1081     }  /* switch */
1082     
1083     /* This point was scheduler_loop in the old RTS */
1084
1085     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1086
1087     TimeOfLastEvent = CurrentTime[CurrentProc];
1088     TimeOfNextEvent = get_time_of_next_event();
1089     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1090     // CurrentTSO = ThreadQueueHd;
1091
1092     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1093                          TimeOfNextEvent));
1094
1095     if (RtsFlags.GranFlags.Light) 
1096       GranSimLight_leave_system(event, &ActiveTSO); 
1097
1098     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1099
1100     IF_DEBUG(gran, 
1101              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1102
1103     /* in a GranSim setup the TSO stays on the run queue */
1104     t = CurrentTSO;
1105     /* Take a thread from the run queue. */
1106     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1107
1108     IF_DEBUG(gran, 
1109              debugBelch("GRAN: About to run current thread, which is\n");
1110              G_TSO(t,5));
1111
1112     context_switch = 0; // turned on via GranYield, checking events and time slice
1113
1114     IF_DEBUG(gran, 
1115              DumpGranEvent(GR_SCHEDULE, t));
1116
1117     procStatus[CurrentProc] = Busy;
1118 }
1119 #endif // GRAN
1120
1121 /* ----------------------------------------------------------------------------
1122  * Send pending messages (PARALLEL_HASKELL only)
1123  * ------------------------------------------------------------------------- */
1124
1125 #if defined(PARALLEL_HASKELL)
1126 static StgTSO *
1127 scheduleSendPendingMessages(void)
1128 {
1129     StgSparkPool *pool;
1130     rtsSpark spark;
1131     StgTSO *t;
1132
1133 # if defined(PAR) // global Mem.Mgmt., omit for now
1134     if (PendingFetches != END_BF_QUEUE) {
1135         processFetches();
1136     }
1137 # endif
1138     
1139     if (RtsFlags.ParFlags.BufferTime) {
1140         // if we use message buffering, we must send away all message
1141         // packets which have become too old...
1142         sendOldBuffers(); 
1143     }
1144 }
1145 #endif
1146
1147 /* ----------------------------------------------------------------------------
1148  * Activate spark threads (PARALLEL_HASKELL only)
1149  * ------------------------------------------------------------------------- */
1150
1151 #if defined(PARALLEL_HASKELL)
1152 static void
1153 scheduleActivateSpark(void)
1154 {
1155 #if defined(SPARKS)
1156   ASSERT(emptyRunQueue());
1157 /* We get here if the run queue is empty and want some work.
1158    We try to turn a spark into a thread, and add it to the run queue,
1159    from where it will be picked up in the next iteration of the scheduler
1160    loop.
1161 */
1162
1163       /* :-[  no local threads => look out for local sparks */
1164       /* the spark pool for the current PE */
1165       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1166       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1167           pool->hd < pool->tl) {
1168         /* 
1169          * ToDo: add GC code check that we really have enough heap afterwards!!
1170          * Old comment:
1171          * If we're here (no runnable threads) and we have pending
1172          * sparks, we must have a space problem.  Get enough space
1173          * to turn one of those pending sparks into a
1174          * thread... 
1175          */
1176
1177         spark = findSpark(rtsFalse);            /* get a spark */
1178         if (spark != (rtsSpark) NULL) {
1179           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1180           IF_PAR_DEBUG(fish, // schedule,
1181                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1182                              tso->id, tso, advisory_thread_count));
1183
1184           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1185             IF_PAR_DEBUG(fish, // schedule,
1186                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1187                             spark));
1188             return rtsFalse; /* failed to generate a thread */
1189           }                  /* otherwise fall through & pick-up new tso */
1190         } else {
1191           IF_PAR_DEBUG(fish, // schedule,
1192                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1193                              spark_queue_len(pool)));
1194           return rtsFalse;  /* failed to generate a thread */
1195         }
1196         return rtsTrue;  /* success in generating a thread */
1197   } else { /* no more threads permitted or pool empty */
1198     return rtsFalse;  /* failed to generateThread */
1199   }
1200 #else
1201   tso = NULL; // avoid compiler warning only
1202   return rtsFalse;  /* dummy in non-PAR setup */
1203 #endif // SPARKS
1204 }
1205 #endif // PARALLEL_HASKELL
1206
1207 /* ----------------------------------------------------------------------------
1208  * Get work from a remote node (PARALLEL_HASKELL only)
1209  * ------------------------------------------------------------------------- */
1210     
1211 #if defined(PARALLEL_HASKELL)
1212 static rtsBool
1213 scheduleGetRemoteWork(rtsBool *receivedFinish)
1214 {
1215   ASSERT(emptyRunQueue());
1216
1217   if (RtsFlags.ParFlags.BufferTime) {
1218         IF_PAR_DEBUG(verbose, 
1219                 debugBelch("...send all pending data,"));
1220         {
1221           nat i;
1222           for (i=1; i<=nPEs; i++)
1223             sendImmediately(i); // send all messages away immediately
1224         }
1225   }
1226 # ifndef SPARKS
1227         //++EDEN++ idle() , i.e. send all buffers, wait for work
1228         // suppress fishing in EDEN... just look for incoming messages
1229         // (blocking receive)
1230   IF_PAR_DEBUG(verbose, 
1231                debugBelch("...wait for incoming messages...\n"));
1232   *receivedFinish = processMessages(); // blocking receive...
1233
1234         // and reenter scheduling loop after having received something
1235         // (return rtsFalse below)
1236
1237 # else /* activate SPARKS machinery */
1238 /* We get here, if we have no work, tried to activate a local spark, but still
1239    have no work. We try to get a remote spark, by sending a FISH message.
1240    Thread migration should be added here, and triggered when a sequence of 
1241    fishes returns without work. */
1242         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1243
1244       /* =8-[  no local sparks => look for work on other PEs */
1245         /*
1246          * We really have absolutely no work.  Send out a fish
1247          * (there may be some out there already), and wait for
1248          * something to arrive.  We clearly can't run any threads
1249          * until a SCHEDULE or RESUME arrives, and so that's what
1250          * we're hoping to see.  (Of course, we still have to
1251          * respond to other types of messages.)
1252          */
1253         rtsTime now = msTime() /*CURRENT_TIME*/;
1254         IF_PAR_DEBUG(verbose, 
1255                      debugBelch("--  now=%ld\n", now));
1256         IF_PAR_DEBUG(fish, // verbose,
1257              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1258                  (last_fish_arrived_at!=0 &&
1259                   last_fish_arrived_at+delay > now)) {
1260                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1261                      now, last_fish_arrived_at+delay, 
1262                      last_fish_arrived_at,
1263                      delay);
1264              });
1265   
1266         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1267             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1268           if (last_fish_arrived_at==0 ||
1269               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1270             /* outstandingFishes is set in sendFish, processFish;
1271                avoid flooding system with fishes via delay */
1272     next_fish_to_send_at = 0;  
1273   } else {
1274     /* ToDo: this should be done in the main scheduling loop to avoid the
1275              busy wait here; not so bad if fish delay is very small  */
1276     int iq = 0; // DEBUGGING -- HWL
1277     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1278     /* send a fish when ready, but process messages that arrive in the meantime */
1279     do {
1280       if (PacketsWaiting()) {
1281         iq++; // DEBUGGING
1282         *receivedFinish = processMessages();
1283       }
1284       now = msTime();
1285     } while (!*receivedFinish || now<next_fish_to_send_at);
1286     // JB: This means the fish could become obsolete, if we receive
1287     // work. Better check for work again? 
1288     // last line: while (!receivedFinish || !haveWork || now<...)
1289     // next line: if (receivedFinish || haveWork )
1290
1291     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1292       return rtsFalse;  // NB: this will leave scheduler loop
1293                         // immediately after return!
1294                           
1295     IF_PAR_DEBUG(fish, // verbose,
1296                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1297
1298   }
1299
1300     // JB: IMHO, this should all be hidden inside sendFish(...)
1301     /* pe = choosePE(); 
1302        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1303                 NEW_FISH_HUNGER);
1304
1305     // Global statistics: count no. of fishes
1306     if (RtsFlags.ParFlags.ParStats.Global &&
1307          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1308            globalParStats.tot_fish_mess++;
1309            }
1310     */ 
1311
1312   /* delayed fishes must have been sent by now! */
1313   next_fish_to_send_at = 0;  
1314   }
1315       
1316   *receivedFinish = processMessages();
1317 # endif /* SPARKS */
1318
1319  return rtsFalse;
1320  /* NB: this function always returns rtsFalse, meaning the scheduler
1321     loop continues with the next iteration; 
1322     rationale: 
1323       return code means success in finding work; we enter this function
1324       if there is no local work, thus have to send a fish which takes
1325       time until it arrives with work; in the meantime we should process
1326       messages in the main loop;
1327  */
1328 }
1329 #endif // PARALLEL_HASKELL
1330
1331 /* ----------------------------------------------------------------------------
1332  * PAR/GRAN: Report stats & debugging info(?)
1333  * ------------------------------------------------------------------------- */
1334
1335 #if defined(PAR) || defined(GRAN)
1336 static void
1337 scheduleGranParReport(void)
1338 {
1339   ASSERT(run_queue_hd != END_TSO_QUEUE);
1340
1341   /* Take a thread from the run queue, if we have work */
1342   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1343
1344     /* If this TSO has got its outport closed in the meantime, 
1345      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1346      * It has to be marked as TH_DEAD for this purpose.
1347      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1348
1349 JB: TODO: investigate wether state change field could be nuked
1350      entirely and replaced by the normal tso state (whatnext
1351      field). All we want to do is to kill tsos from outside.
1352      */
1353
1354     /* ToDo: write something to the log-file
1355     if (RTSflags.ParFlags.granSimStats && !sameThread)
1356         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1357
1358     CurrentTSO = t;
1359     */
1360     /* the spark pool for the current PE */
1361     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1362
1363     IF_DEBUG(scheduler, 
1364              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1365                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1366
1367     IF_PAR_DEBUG(fish,
1368              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1369                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1370
1371     if (RtsFlags.ParFlags.ParStats.Full && 
1372         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1373         (emitSchedule || // forced emit
1374          (t && LastTSO && t->id != LastTSO->id))) {
1375       /* 
1376          we are running a different TSO, so write a schedule event to log file
1377          NB: If we use fair scheduling we also have to write  a deschedule 
1378              event for LastTSO; with unfair scheduling we know that the
1379              previous tso has blocked whenever we switch to another tso, so
1380              we don't need it in GUM for now
1381       */
1382       IF_PAR_DEBUG(fish, // schedule,
1383                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1384
1385       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1386                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1387       emitSchedule = rtsFalse;
1388     }
1389 }     
1390 #endif
1391
1392 /* ----------------------------------------------------------------------------
1393  * After running a thread...
1394  * ------------------------------------------------------------------------- */
1395
1396 static void
1397 schedulePostRunThread(void)
1398 {
1399 #if defined(PAR)
1400     /* HACK 675: if the last thread didn't yield, make sure to print a 
1401        SCHEDULE event to the log file when StgRunning the next thread, even
1402        if it is the same one as before */
1403     LastTSO = t; 
1404     TimeOfLastYield = CURRENT_TIME;
1405 #endif
1406
1407   /* some statistics gathering in the parallel case */
1408
1409 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1410   switch (ret) {
1411     case HeapOverflow:
1412 # if defined(GRAN)
1413       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1414       globalGranStats.tot_heapover++;
1415 # elif defined(PAR)
1416       globalParStats.tot_heapover++;
1417 # endif
1418       break;
1419
1420      case StackOverflow:
1421 # if defined(GRAN)
1422       IF_DEBUG(gran, 
1423                DumpGranEvent(GR_DESCHEDULE, t));
1424       globalGranStats.tot_stackover++;
1425 # elif defined(PAR)
1426       // IF_DEBUG(par, 
1427       // DumpGranEvent(GR_DESCHEDULE, t);
1428       globalParStats.tot_stackover++;
1429 # endif
1430       break;
1431
1432     case ThreadYielding:
1433 # if defined(GRAN)
1434       IF_DEBUG(gran, 
1435                DumpGranEvent(GR_DESCHEDULE, t));
1436       globalGranStats.tot_yields++;
1437 # elif defined(PAR)
1438       // IF_DEBUG(par, 
1439       // DumpGranEvent(GR_DESCHEDULE, t);
1440       globalParStats.tot_yields++;
1441 # endif
1442       break; 
1443
1444     case ThreadBlocked:
1445 # if defined(GRAN)
1446       IF_DEBUG(scheduler,
1447                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1448                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1449                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1450                if (t->block_info.closure!=(StgClosure*)NULL)
1451                  print_bq(t->block_info.closure);
1452                debugBelch("\n"));
1453
1454       // ??? needed; should emit block before
1455       IF_DEBUG(gran, 
1456                DumpGranEvent(GR_DESCHEDULE, t)); 
1457       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1458       /*
1459         ngoq Dogh!
1460       ASSERT(procStatus[CurrentProc]==Busy || 
1461               ((procStatus[CurrentProc]==Fetching) && 
1462               (t->block_info.closure!=(StgClosure*)NULL)));
1463       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1464           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1465             procStatus[CurrentProc]==Fetching)) 
1466         procStatus[CurrentProc] = Idle;
1467       */
1468 # elif defined(PAR)
1469 //++PAR++  blockThread() writes the event (change?)
1470 # endif
1471     break;
1472
1473   case ThreadFinished:
1474     break;
1475
1476   default:
1477     barf("parGlobalStats: unknown return code");
1478     break;
1479     }
1480 #endif
1481 }
1482
1483 /* -----------------------------------------------------------------------------
1484  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1485  * -------------------------------------------------------------------------- */
1486
1487 static rtsBool
1488 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1489 {
1490     // did the task ask for a large block?
1491     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1492         // if so, get one and push it on the front of the nursery.
1493         bdescr *bd;
1494         lnat blocks;
1495         
1496         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1497         
1498         IF_DEBUG(scheduler,
1499                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1500                             (long)t->id, whatNext_strs[t->what_next], blocks));
1501         
1502         // don't do this if the nursery is (nearly) full, we'll GC first.
1503         if (cap->r.rCurrentNursery->link != NULL ||
1504             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1505                                                // if the nursery has only one block.
1506             
1507             ACQUIRE_SM_LOCK
1508             bd = allocGroup( blocks );
1509             RELEASE_SM_LOCK
1510             cap->r.rNursery->n_blocks += blocks;
1511             
1512             // link the new group into the list
1513             bd->link = cap->r.rCurrentNursery;
1514             bd->u.back = cap->r.rCurrentNursery->u.back;
1515             if (cap->r.rCurrentNursery->u.back != NULL) {
1516                 cap->r.rCurrentNursery->u.back->link = bd;
1517             } else {
1518 #if !defined(SMP)
1519                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1520                        g0s0 == cap->r.rNursery);
1521 #endif
1522                 cap->r.rNursery->blocks = bd;
1523             }             
1524             cap->r.rCurrentNursery->u.back = bd;
1525             
1526             // initialise it as a nursery block.  We initialise the
1527             // step, gen_no, and flags field of *every* sub-block in
1528             // this large block, because this is easier than making
1529             // sure that we always find the block head of a large
1530             // block whenever we call Bdescr() (eg. evacuate() and
1531             // isAlive() in the GC would both have to do this, at
1532             // least).
1533             { 
1534                 bdescr *x;
1535                 for (x = bd; x < bd + blocks; x++) {
1536                     x->step = cap->r.rNursery;
1537                     x->gen_no = 0;
1538                     x->flags = 0;
1539                 }
1540             }
1541             
1542             // This assert can be a killer if the app is doing lots
1543             // of large block allocations.
1544             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1545             
1546             // now update the nursery to point to the new block
1547             cap->r.rCurrentNursery = bd;
1548             
1549             // we might be unlucky and have another thread get on the
1550             // run queue before us and steal the large block, but in that
1551             // case the thread will just end up requesting another large
1552             // block.
1553             pushOnRunQueue(cap,t);
1554             return rtsFalse;  /* not actually GC'ing */
1555         }
1556     }
1557     
1558     IF_DEBUG(scheduler,
1559              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1560                         (long)t->id, whatNext_strs[t->what_next]));
1561 #if defined(GRAN)
1562     ASSERT(!is_on_queue(t,CurrentProc));
1563 #elif defined(PARALLEL_HASKELL)
1564     /* Currently we emit a DESCHEDULE event before GC in GUM.
1565        ToDo: either add separate event to distinguish SYSTEM time from rest
1566        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1567     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1568         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1569                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1570         emitSchedule = rtsTrue;
1571     }
1572 #endif
1573       
1574     pushOnRunQueue(cap,t);
1575     return rtsTrue;
1576     /* actual GC is done at the end of the while loop in schedule() */
1577 }
1578
1579 /* -----------------------------------------------------------------------------
1580  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1581  * -------------------------------------------------------------------------- */
1582
1583 static void
1584 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1585 {
1586     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1587                                   (long)t->id, whatNext_strs[t->what_next]));
1588     /* just adjust the stack for this thread, then pop it back
1589      * on the run queue.
1590      */
1591     { 
1592         /* enlarge the stack */
1593         StgTSO *new_t = threadStackOverflow(cap, t);
1594         
1595         /* The TSO attached to this Task may have moved, so update the
1596          * pointer to it.
1597          */
1598         if (task->tso == t) {
1599             task->tso = new_t;
1600         }
1601         pushOnRunQueue(cap,new_t);
1602     }
1603 }
1604
1605 /* -----------------------------------------------------------------------------
1606  * Handle a thread that returned to the scheduler with ThreadYielding
1607  * -------------------------------------------------------------------------- */
1608
1609 static rtsBool
1610 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1611 {
1612     // Reset the context switch flag.  We don't do this just before
1613     // running the thread, because that would mean we would lose ticks
1614     // during GC, which can lead to unfair scheduling (a thread hogs
1615     // the CPU because the tick always arrives during GC).  This way
1616     // penalises threads that do a lot of allocation, but that seems
1617     // better than the alternative.
1618     context_switch = 0;
1619     
1620     /* put the thread back on the run queue.  Then, if we're ready to
1621      * GC, check whether this is the last task to stop.  If so, wake
1622      * up the GC thread.  getThread will block during a GC until the
1623      * GC is finished.
1624      */
1625     IF_DEBUG(scheduler,
1626              if (t->what_next != prev_what_next) {
1627                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1628                             (long)t->id, whatNext_strs[t->what_next]);
1629              } else {
1630                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1631                             (long)t->id, whatNext_strs[t->what_next]);
1632              }
1633         );
1634     
1635     IF_DEBUG(sanity,
1636              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1637              checkTSO(t));
1638     ASSERT(t->link == END_TSO_QUEUE);
1639     
1640     // Shortcut if we're just switching evaluators: don't bother
1641     // doing stack squeezing (which can be expensive), just run the
1642     // thread.
1643     if (t->what_next != prev_what_next) {
1644         return rtsTrue;
1645     }
1646     
1647 #if defined(GRAN)
1648     ASSERT(!is_on_queue(t,CurrentProc));
1649       
1650     IF_DEBUG(sanity,
1651              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1652              checkThreadQsSanity(rtsTrue));
1653
1654 #endif
1655
1656     addToRunQueue(cap,t);
1657
1658 #if defined(GRAN)
1659     /* add a ContinueThread event to actually process the thread */
1660     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1661               ContinueThread,
1662               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1663     IF_GRAN_DEBUG(bq, 
1664                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1665                   G_EVENTQ(0);
1666                   G_CURR_THREADQ(0));
1667 #endif
1668     return rtsFalse;
1669 }
1670
1671 /* -----------------------------------------------------------------------------
1672  * Handle a thread that returned to the scheduler with ThreadBlocked
1673  * -------------------------------------------------------------------------- */
1674
1675 static void
1676 scheduleHandleThreadBlocked( StgTSO *t
1677 #if !defined(GRAN) && !defined(DEBUG)
1678     STG_UNUSED
1679 #endif
1680     )
1681 {
1682 #if defined(GRAN)
1683     IF_DEBUG(scheduler,
1684              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1685                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1686              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1687     
1688     // ??? needed; should emit block before
1689     IF_DEBUG(gran, 
1690              DumpGranEvent(GR_DESCHEDULE, t)); 
1691     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1692     /*
1693       ngoq Dogh!
1694       ASSERT(procStatus[CurrentProc]==Busy || 
1695       ((procStatus[CurrentProc]==Fetching) && 
1696       (t->block_info.closure!=(StgClosure*)NULL)));
1697       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1698       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1699       procStatus[CurrentProc]==Fetching)) 
1700       procStatus[CurrentProc] = Idle;
1701     */
1702 #elif defined(PAR)
1703     IF_DEBUG(scheduler,
1704              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1705                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1706     IF_PAR_DEBUG(bq,
1707                  
1708                  if (t->block_info.closure!=(StgClosure*)NULL) 
1709                  print_bq(t->block_info.closure));
1710     
1711     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1712     blockThread(t);
1713     
1714     /* whatever we schedule next, we must log that schedule */
1715     emitSchedule = rtsTrue;
1716     
1717 #else /* !GRAN */
1718
1719       // We don't need to do anything.  The thread is blocked, and it
1720       // has tidied up its stack and placed itself on whatever queue
1721       // it needs to be on.
1722
1723 #if !defined(SMP)
1724     ASSERT(t->why_blocked != NotBlocked);
1725              // This might not be true under SMP: we don't have
1726              // exclusive access to this TSO, so someone might have
1727              // woken it up by now.  This actually happens: try
1728              // conc023 +RTS -N2.
1729 #endif
1730
1731     IF_DEBUG(scheduler,
1732              debugBelch("--<< thread %d (%s) stopped: ", 
1733                         t->id, whatNext_strs[t->what_next]);
1734              printThreadBlockage(t);
1735              debugBelch("\n"));
1736     
1737     /* Only for dumping event to log file 
1738        ToDo: do I need this in GranSim, too?
1739        blockThread(t);
1740     */
1741 #endif
1742 }
1743
1744 /* -----------------------------------------------------------------------------
1745  * Handle a thread that returned to the scheduler with ThreadFinished
1746  * -------------------------------------------------------------------------- */
1747
1748 static rtsBool
1749 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1750 {
1751     /* Need to check whether this was a main thread, and if so,
1752      * return with the return value.
1753      *
1754      * We also end up here if the thread kills itself with an
1755      * uncaught exception, see Exception.cmm.
1756      */
1757     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1758                                   t->id, whatNext_strs[t->what_next]));
1759
1760 #if defined(GRAN)
1761       endThread(t, CurrentProc); // clean-up the thread
1762 #elif defined(PARALLEL_HASKELL)
1763       /* For now all are advisory -- HWL */
1764       //if(t->priority==AdvisoryPriority) ??
1765       advisory_thread_count--; // JB: Caution with this counter, buggy!
1766       
1767 # if defined(DIST)
1768       if(t->dist.priority==RevalPriority)
1769         FinishReval(t);
1770 # endif
1771     
1772 # if defined(EDENOLD)
1773       // the thread could still have an outport... (BUG)
1774       if (t->eden.outport != -1) {
1775       // delete the outport for the tso which has finished...
1776         IF_PAR_DEBUG(eden_ports,
1777                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1778                               t->eden.outport, t->id));
1779         deleteOPT(t);
1780       }
1781       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1782       if (t->eden.epid != -1) {
1783         IF_PAR_DEBUG(eden_ports,
1784                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1785                            t->id, t->eden.epid));
1786         removeTSOfromProcess(t);
1787       }
1788 # endif 
1789
1790 # if defined(PAR)
1791       if (RtsFlags.ParFlags.ParStats.Full &&
1792           !RtsFlags.ParFlags.ParStats.Suppressed) 
1793         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1794
1795       //  t->par only contains statistics: left out for now...
1796       IF_PAR_DEBUG(fish,
1797                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1798                               t->id,t,t->par.sparkname));
1799 # endif
1800 #endif // PARALLEL_HASKELL
1801
1802       //
1803       // Check whether the thread that just completed was a bound
1804       // thread, and if so return with the result.  
1805       //
1806       // There is an assumption here that all thread completion goes
1807       // through this point; we need to make sure that if a thread
1808       // ends up in the ThreadKilled state, that it stays on the run
1809       // queue so it can be dealt with here.
1810       //
1811
1812       if (t->bound) {
1813
1814           if (t->bound != task) {
1815 #if !defined(THREADED_RTS)
1816               // Must be a bound thread that is not the topmost one.  Leave
1817               // it on the run queue until the stack has unwound to the
1818               // point where we can deal with this.  Leaving it on the run
1819               // queue also ensures that the garbage collector knows about
1820               // this thread and its return value (it gets dropped from the
1821               // all_threads list so there's no other way to find it).
1822               appendToRunQueue(cap,t);
1823               return rtsFalse;
1824 #else
1825               // this cannot happen in the threaded RTS, because a
1826               // bound thread can only be run by the appropriate Task.
1827               barf("finished bound thread that isn't mine");
1828 #endif
1829           }
1830
1831           ASSERT(task->tso == t);
1832
1833           if (t->what_next == ThreadComplete) {
1834               if (task->ret) {
1835                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1836                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1837               }
1838               task->stat = Success;
1839           } else {
1840               if (task->ret) {
1841                   *(task->ret) = NULL;
1842               }
1843               if (interrupted) {
1844                   task->stat = Interrupted;
1845               } else {
1846                   task->stat = Killed;
1847               }
1848           }
1849 #ifdef DEBUG
1850           removeThreadLabel((StgWord)task->tso->id);
1851 #endif
1852           return rtsTrue; // tells schedule() to return
1853       }
1854
1855       return rtsFalse;
1856 }
1857
1858 /* -----------------------------------------------------------------------------
1859  * Perform a heap census, if PROFILING
1860  * -------------------------------------------------------------------------- */
1861
1862 static rtsBool
1863 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1864 {
1865 #if defined(PROFILING)
1866     // When we have +RTS -i0 and we're heap profiling, do a census at
1867     // every GC.  This lets us get repeatable runs for debugging.
1868     if (performHeapProfile ||
1869         (RtsFlags.ProfFlags.profileInterval==0 &&
1870          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1871         GarbageCollect(GetRoots, rtsTrue);
1872         heapCensus();
1873         performHeapProfile = rtsFalse;
1874         return rtsTrue;  // true <=> we already GC'd
1875     }
1876 #endif
1877     return rtsFalse;
1878 }
1879
1880 /* -----------------------------------------------------------------------------
1881  * Perform a garbage collection if necessary
1882  * -------------------------------------------------------------------------- */
1883
1884 static void
1885 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1886 {
1887     StgTSO *t;
1888 #ifdef SMP
1889     static volatile StgWord waiting_for_gc;
1890     rtsBool was_waiting;
1891     nat i;
1892 #endif
1893
1894 #ifdef SMP
1895     // In order to GC, there must be no threads running Haskell code.
1896     // Therefore, the GC thread needs to hold *all* the capabilities,
1897     // and release them after the GC has completed.  
1898     //
1899     // This seems to be the simplest way: previous attempts involved
1900     // making all the threads with capabilities give up their
1901     // capabilities and sleep except for the *last* one, which
1902     // actually did the GC.  But it's quite hard to arrange for all
1903     // the other tasks to sleep and stay asleep.
1904     //
1905         
1906     was_waiting = cas(&waiting_for_gc, 0, 1);
1907     if (was_waiting) {
1908         do {
1909             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1910             yieldCapability(&cap,task);
1911         } while (waiting_for_gc);
1912         return;
1913     }
1914
1915     for (i=0; i < n_capabilities; i++) {
1916         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1917         if (cap != &capabilities[i]) {
1918             Capability *pcap = &capabilities[i];
1919             // we better hope this task doesn't get migrated to
1920             // another Capability while we're waiting for this one.
1921             // It won't, because load balancing happens while we have
1922             // all the Capabilities, but even so it's a slightly
1923             // unsavoury invariant.
1924             task->cap = pcap;
1925             context_switch = 1;
1926             waitForReturnCapability(&pcap, task);
1927             if (pcap != &capabilities[i]) {
1928                 barf("scheduleDoGC: got the wrong capability");
1929             }
1930         }
1931     }
1932
1933     waiting_for_gc = rtsFalse;
1934 #endif
1935
1936     /* Kick any transactions which are invalid back to their
1937      * atomically frames.  When next scheduled they will try to
1938      * commit, this commit will fail and they will retry.
1939      */
1940     { 
1941         StgTSO *next;
1942
1943         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1944             if (t->what_next == ThreadRelocated) {
1945                 next = t->link;
1946             } else {
1947                 next = t->global_link;
1948                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1949                     if (!stmValidateNestOfTransactions (t -> trec)) {
1950                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1951                         
1952                         // strip the stack back to the
1953                         // ATOMICALLY_FRAME, aborting the (nested)
1954                         // transaction, and saving the stack of any
1955                         // partially-evaluated thunks on the heap.
1956                         raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1957                         
1958 #ifdef REG_R1
1959                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1960 #endif
1961                     }
1962                 }
1963             }
1964         }
1965     }
1966     
1967     // so this happens periodically:
1968     scheduleCheckBlackHoles(cap);
1969     
1970     IF_DEBUG(scheduler, printAllThreads());
1971
1972     /* everybody back, start the GC.
1973      * Could do it in this thread, or signal a condition var
1974      * to do it in another thread.  Either way, we need to
1975      * broadcast on gc_pending_cond afterward.
1976      */
1977 #if defined(THREADED_RTS)
1978     IF_DEBUG(scheduler,sched_belch("doing GC"));
1979 #endif
1980     GarbageCollect(GetRoots, force_major);
1981     
1982 #if defined(SMP)
1983     // release our stash of capabilities.
1984     for (i = 0; i < n_capabilities; i++) {
1985         if (cap != &capabilities[i]) {
1986             task->cap = &capabilities[i];
1987             releaseCapability(&capabilities[i]);
1988         }
1989     }
1990     task->cap = cap;
1991 #endif
1992
1993 #if defined(GRAN)
1994     /* add a ContinueThread event to continue execution of current thread */
1995     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1996               ContinueThread,
1997               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1998     IF_GRAN_DEBUG(bq, 
1999                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2000                   G_EVENTQ(0);
2001                   G_CURR_THREADQ(0));
2002 #endif /* GRAN */
2003 }
2004
2005 /* ---------------------------------------------------------------------------
2006  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2007  * used by Control.Concurrent for error checking.
2008  * ------------------------------------------------------------------------- */
2009  
2010 StgBool
2011 rtsSupportsBoundThreads(void)
2012 {
2013 #if defined(THREADED_RTS)
2014   return rtsTrue;
2015 #else
2016   return rtsFalse;
2017 #endif
2018 }
2019
2020 /* ---------------------------------------------------------------------------
2021  * isThreadBound(tso): check whether tso is bound to an OS thread.
2022  * ------------------------------------------------------------------------- */
2023  
2024 StgBool
2025 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2026 {
2027 #if defined(THREADED_RTS)
2028   return (tso->bound != NULL);
2029 #endif
2030   return rtsFalse;
2031 }
2032
2033 /* ---------------------------------------------------------------------------
2034  * Singleton fork(). Do not copy any running threads.
2035  * ------------------------------------------------------------------------- */
2036
2037 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2038 #define FORKPROCESS_PRIMOP_SUPPORTED
2039 #endif
2040
2041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2042 static void 
2043 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2044 #endif
2045 StgInt
2046 forkProcess(HsStablePtr *entry
2047 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2048             STG_UNUSED
2049 #endif
2050            )
2051 {
2052 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2053     Task *task;
2054     pid_t pid;
2055     StgTSO* t,*next;
2056     Capability *cap;
2057     
2058     IF_DEBUG(scheduler,sched_belch("forking!"));
2059     
2060     // ToDo: for SMP, we should probably acquire *all* the capabilities
2061     cap = rts_lock();
2062     
2063     pid = fork();
2064     
2065     if (pid) { // parent
2066         
2067         // just return the pid
2068         rts_unlock(cap);
2069         return pid;
2070         
2071     } else { // child
2072         
2073         // delete all threads
2074         cap->run_queue_hd = END_TSO_QUEUE;
2075         cap->run_queue_tl = END_TSO_QUEUE;
2076         
2077         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2078             next = t->link;
2079             
2080             // don't allow threads to catch the ThreadKilled exception
2081             deleteThreadImmediately(cap,t);
2082         }
2083         
2084         // wipe the task list
2085         ACQUIRE_LOCK(&sched_mutex);
2086         for (task = all_tasks; task != NULL; task=task->all_link) {
2087             if (task != cap->running_task) discardTask(task);
2088         }
2089         RELEASE_LOCK(&sched_mutex);
2090
2091         cap->suspended_ccalling_tasks = NULL;
2092
2093 #if defined(THREADED_RTS)
2094         // wipe our spare workers list.
2095         cap->spare_workers = NULL;
2096         cap->returning_tasks_hd = NULL;
2097         cap->returning_tasks_tl = NULL;
2098 #endif
2099
2100         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2101         rts_checkSchedStatus("forkProcess",cap);
2102         
2103         rts_unlock(cap);
2104         hs_exit();                      // clean up and exit
2105         stg_exit(EXIT_SUCCESS);
2106     }
2107 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2108     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2109     return -1;
2110 #endif
2111 }
2112
2113 /* ---------------------------------------------------------------------------
2114  * Delete the threads on the run queue of the current capability.
2115  * ------------------------------------------------------------------------- */
2116    
2117 static void
2118 deleteRunQueue (Capability *cap)
2119 {
2120     StgTSO *t, *next;
2121     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2122         ASSERT(t->what_next != ThreadRelocated);
2123         next = t->link;
2124         deleteThread(cap, t);
2125     }
2126 }
2127
2128 /* startThread and  insertThread are now in GranSim.c -- HWL */
2129
2130
2131 /* -----------------------------------------------------------------------------
2132    Managing the suspended_ccalling_tasks list.
2133    Locks required: sched_mutex
2134    -------------------------------------------------------------------------- */
2135
2136 STATIC_INLINE void
2137 suspendTask (Capability *cap, Task *task)
2138 {
2139     ASSERT(task->next == NULL && task->prev == NULL);
2140     task->next = cap->suspended_ccalling_tasks;
2141     task->prev = NULL;
2142     if (cap->suspended_ccalling_tasks) {
2143         cap->suspended_ccalling_tasks->prev = task;
2144     }
2145     cap->suspended_ccalling_tasks = task;
2146 }
2147
2148 STATIC_INLINE void
2149 recoverSuspendedTask (Capability *cap, Task *task)
2150 {
2151     if (task->prev) {
2152         task->prev->next = task->next;
2153     } else {
2154         ASSERT(cap->suspended_ccalling_tasks == task);
2155         cap->suspended_ccalling_tasks = task->next;
2156     }
2157     if (task->next) {
2158         task->next->prev = task->prev;
2159     }
2160     task->next = task->prev = NULL;
2161 }
2162
2163 /* ---------------------------------------------------------------------------
2164  * Suspending & resuming Haskell threads.
2165  * 
2166  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2167  * its capability before calling the C function.  This allows another
2168  * task to pick up the capability and carry on running Haskell
2169  * threads.  It also means that if the C call blocks, it won't lock
2170  * the whole system.
2171  *
2172  * The Haskell thread making the C call is put to sleep for the
2173  * duration of the call, on the susepended_ccalling_threads queue.  We
2174  * give out a token to the task, which it can use to resume the thread
2175  * on return from the C function.
2176  * ------------------------------------------------------------------------- */
2177    
2178 void *
2179 suspendThread (StgRegTable *reg)
2180 {
2181   Capability *cap;
2182   int saved_errno = errno;
2183   StgTSO *tso;
2184   Task *task;
2185
2186   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2187    */
2188   cap = regTableToCapability(reg);
2189
2190   task = cap->running_task;
2191   tso = cap->r.rCurrentTSO;
2192
2193   IF_DEBUG(scheduler,
2194            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2195
2196   // XXX this might not be necessary --SDM
2197   tso->what_next = ThreadRunGHC;
2198
2199   threadPaused(cap,tso);
2200
2201   if(tso->blocked_exceptions == NULL)  {
2202       tso->why_blocked = BlockedOnCCall;
2203       tso->blocked_exceptions = END_TSO_QUEUE;
2204   } else {
2205       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2206   }
2207
2208   // Hand back capability
2209   task->suspended_tso = tso;
2210
2211   ACQUIRE_LOCK(&cap->lock);
2212
2213   suspendTask(cap,task);
2214   cap->in_haskell = rtsFalse;
2215   releaseCapability_(cap);
2216   
2217   RELEASE_LOCK(&cap->lock);
2218
2219 #if defined(THREADED_RTS)
2220   /* Preparing to leave the RTS, so ensure there's a native thread/task
2221      waiting to take over.
2222   */
2223   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2224 #endif
2225
2226   errno = saved_errno;
2227   return task;
2228 }
2229
2230 StgRegTable *
2231 resumeThread (void *task_)
2232 {
2233     StgTSO *tso;
2234     Capability *cap;
2235     int saved_errno = errno;
2236     Task *task = task_;
2237
2238     cap = task->cap;
2239     // Wait for permission to re-enter the RTS with the result.
2240     waitForReturnCapability(&cap,task);
2241     // we might be on a different capability now... but if so, our
2242     // entry on the suspended_ccalling_tasks list will also have been
2243     // migrated.
2244
2245     // Remove the thread from the suspended list
2246     recoverSuspendedTask(cap,task);
2247
2248     tso = task->suspended_tso;
2249     task->suspended_tso = NULL;
2250     tso->link = END_TSO_QUEUE;
2251     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2252     
2253     if (tso->why_blocked == BlockedOnCCall) {
2254         awakenBlockedQueue(cap,tso->blocked_exceptions);
2255         tso->blocked_exceptions = NULL;
2256     }
2257     
2258     /* Reset blocking status */
2259     tso->why_blocked  = NotBlocked;
2260     
2261     cap->r.rCurrentTSO = tso;
2262     cap->in_haskell = rtsTrue;
2263     errno = saved_errno;
2264
2265     return &cap->r;
2266 }
2267
2268 /* ---------------------------------------------------------------------------
2269  * Comparing Thread ids.
2270  *
2271  * This is used from STG land in the implementation of the
2272  * instances of Eq/Ord for ThreadIds.
2273  * ------------------------------------------------------------------------ */
2274
2275 int
2276 cmp_thread(StgPtr tso1, StgPtr tso2) 
2277
2278   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2279   StgThreadID id2 = ((StgTSO *)tso2)->id;
2280  
2281   if (id1 < id2) return (-1);
2282   if (id1 > id2) return 1;
2283   return 0;
2284 }
2285
2286 /* ---------------------------------------------------------------------------
2287  * Fetching the ThreadID from an StgTSO.
2288  *
2289  * This is used in the implementation of Show for ThreadIds.
2290  * ------------------------------------------------------------------------ */
2291 int
2292 rts_getThreadId(StgPtr tso) 
2293 {
2294   return ((StgTSO *)tso)->id;
2295 }
2296
2297 #ifdef DEBUG
2298 void
2299 labelThread(StgPtr tso, char *label)
2300 {
2301   int len;
2302   void *buf;
2303
2304   /* Caveat: Once set, you can only set the thread name to "" */
2305   len = strlen(label)+1;
2306   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2307   strncpy(buf,label,len);
2308   /* Update will free the old memory for us */
2309   updateThreadLabel(((StgTSO *)tso)->id,buf);
2310 }
2311 #endif /* DEBUG */
2312
2313 /* ---------------------------------------------------------------------------
2314    Create a new thread.
2315
2316    The new thread starts with the given stack size.  Before the
2317    scheduler can run, however, this thread needs to have a closure
2318    (and possibly some arguments) pushed on its stack.  See
2319    pushClosure() in Schedule.h.
2320
2321    createGenThread() and createIOThread() (in SchedAPI.h) are
2322    convenient packaged versions of this function.
2323
2324    currently pri (priority) is only used in a GRAN setup -- HWL
2325    ------------------------------------------------------------------------ */
2326 #if defined(GRAN)
2327 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2328 StgTSO *
2329 createThread(nat size, StgInt pri)
2330 #else
2331 StgTSO *
2332 createThread(Capability *cap, nat size)
2333 #endif
2334 {
2335     StgTSO *tso;
2336     nat stack_size;
2337
2338     /* sched_mutex is *not* required */
2339
2340     /* First check whether we should create a thread at all */
2341 #if defined(PARALLEL_HASKELL)
2342     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2343     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2344         threadsIgnored++;
2345         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2346                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2347         return END_TSO_QUEUE;
2348     }
2349     threadsCreated++;
2350 #endif
2351
2352 #if defined(GRAN)
2353     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2354 #endif
2355
2356     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2357
2358     /* catch ridiculously small stack sizes */
2359     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2360         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2361     }
2362
2363     stack_size = size - TSO_STRUCT_SIZEW;
2364     
2365     tso = (StgTSO *)allocateLocal(cap, size);
2366     TICK_ALLOC_TSO(stack_size, 0);
2367
2368     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2369 #if defined(GRAN)
2370     SET_GRAN_HDR(tso, ThisPE);
2371 #endif
2372
2373     // Always start with the compiled code evaluator
2374     tso->what_next = ThreadRunGHC;
2375
2376     tso->why_blocked  = NotBlocked;
2377     tso->blocked_exceptions = NULL;
2378     
2379     tso->saved_errno = 0;
2380     tso->bound = NULL;
2381     
2382     tso->stack_size     = stack_size;
2383     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2384                           - TSO_STRUCT_SIZEW;
2385     tso->sp             = (P_)&(tso->stack) + stack_size;
2386
2387     tso->trec = NO_TREC;
2388     
2389 #ifdef PROFILING
2390     tso->prof.CCCS = CCS_MAIN;
2391 #endif
2392     
2393   /* put a stop frame on the stack */
2394     tso->sp -= sizeofW(StgStopFrame);
2395     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2396     tso->link = END_TSO_QUEUE;
2397     
2398   // ToDo: check this
2399 #if defined(GRAN)
2400     /* uses more flexible routine in GranSim */
2401     insertThread(tso, CurrentProc);
2402 #else
2403     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2404      * from its creation
2405      */
2406 #endif
2407     
2408 #if defined(GRAN) 
2409     if (RtsFlags.GranFlags.GranSimStats.Full) 
2410         DumpGranEvent(GR_START,tso);
2411 #elif defined(PARALLEL_HASKELL)
2412     if (RtsFlags.ParFlags.ParStats.Full) 
2413         DumpGranEvent(GR_STARTQ,tso);
2414     /* HACk to avoid SCHEDULE 
2415        LastTSO = tso; */
2416 #endif
2417     
2418     /* Link the new thread on the global thread list.
2419      */
2420     ACQUIRE_LOCK(&sched_mutex);
2421     tso->id = next_thread_id++;  // while we have the mutex
2422     tso->global_link = all_threads;
2423     all_threads = tso;
2424     RELEASE_LOCK(&sched_mutex);
2425     
2426 #if defined(DIST)
2427     tso->dist.priority = MandatoryPriority; //by default that is...
2428 #endif
2429     
2430 #if defined(GRAN)
2431     tso->gran.pri = pri;
2432 # if defined(DEBUG)
2433     tso->gran.magic = TSO_MAGIC; // debugging only
2434 # endif
2435     tso->gran.sparkname   = 0;
2436     tso->gran.startedat   = CURRENT_TIME; 
2437     tso->gran.exported    = 0;
2438     tso->gran.basicblocks = 0;
2439     tso->gran.allocs      = 0;
2440     tso->gran.exectime    = 0;
2441     tso->gran.fetchtime   = 0;
2442     tso->gran.fetchcount  = 0;
2443     tso->gran.blocktime   = 0;
2444     tso->gran.blockcount  = 0;
2445     tso->gran.blockedat   = 0;
2446     tso->gran.globalsparks = 0;
2447     tso->gran.localsparks  = 0;
2448     if (RtsFlags.GranFlags.Light)
2449         tso->gran.clock  = Now; /* local clock */
2450     else
2451         tso->gran.clock  = 0;
2452     
2453     IF_DEBUG(gran,printTSO(tso));
2454 #elif defined(PARALLEL_HASKELL)
2455 # if defined(DEBUG)
2456     tso->par.magic = TSO_MAGIC; // debugging only
2457 # endif
2458     tso->par.sparkname   = 0;
2459     tso->par.startedat   = CURRENT_TIME; 
2460     tso->par.exported    = 0;
2461     tso->par.basicblocks = 0;
2462     tso->par.allocs      = 0;
2463     tso->par.exectime    = 0;
2464     tso->par.fetchtime   = 0;
2465     tso->par.fetchcount  = 0;
2466     tso->par.blocktime   = 0;
2467     tso->par.blockcount  = 0;
2468     tso->par.blockedat   = 0;
2469     tso->par.globalsparks = 0;
2470     tso->par.localsparks  = 0;
2471 #endif
2472     
2473 #if defined(GRAN)
2474     globalGranStats.tot_threads_created++;
2475     globalGranStats.threads_created_on_PE[CurrentProc]++;
2476     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2477     globalGranStats.tot_sq_probes++;
2478 #elif defined(PARALLEL_HASKELL)
2479     // collect parallel global statistics (currently done together with GC stats)
2480     if (RtsFlags.ParFlags.ParStats.Global &&
2481         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2482         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2483         globalParStats.tot_threads_created++;
2484     }
2485 #endif 
2486     
2487 #if defined(GRAN)
2488     IF_GRAN_DEBUG(pri,
2489                   sched_belch("==__ schedule: Created TSO %d (%p);",
2490                               CurrentProc, tso, tso->id));
2491 #elif defined(PARALLEL_HASKELL)
2492     IF_PAR_DEBUG(verbose,
2493                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2494                              (long)tso->id, tso, advisory_thread_count));
2495 #else
2496     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2497                                    (long)tso->id, (long)tso->stack_size));
2498 #endif    
2499     return tso;
2500 }
2501
2502 #if defined(PAR)
2503 /* RFP:
2504    all parallel thread creation calls should fall through the following routine.
2505 */
2506 StgTSO *
2507 createThreadFromSpark(rtsSpark spark) 
2508 { StgTSO *tso;
2509   ASSERT(spark != (rtsSpark)NULL);
2510 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2511   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2512   { threadsIgnored++;
2513     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2514           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2515     return END_TSO_QUEUE;
2516   }
2517   else
2518   { threadsCreated++;
2519     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2520     if (tso==END_TSO_QUEUE)     
2521       barf("createSparkThread: Cannot create TSO");
2522 #if defined(DIST)
2523     tso->priority = AdvisoryPriority;
2524 #endif
2525     pushClosure(tso,spark);
2526     addToRunQueue(tso);
2527     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2528   }
2529   return tso;
2530 }
2531 #endif
2532
2533 /*
2534   Turn a spark into a thread.
2535   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2536 */
2537 #if 0
2538 StgTSO *
2539 activateSpark (rtsSpark spark) 
2540 {
2541   StgTSO *tso;
2542
2543   tso = createSparkThread(spark);
2544   if (RtsFlags.ParFlags.ParStats.Full) {   
2545     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2546       IF_PAR_DEBUG(verbose,
2547                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2548                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2549   }
2550   // ToDo: fwd info on local/global spark to thread -- HWL
2551   // tso->gran.exported =  spark->exported;
2552   // tso->gran.locked =   !spark->global;
2553   // tso->gran.sparkname = spark->name;
2554
2555   return tso;
2556 }
2557 #endif
2558
2559 /* ---------------------------------------------------------------------------
2560  * scheduleThread()
2561  *
2562  * scheduleThread puts a thread on the end  of the runnable queue.
2563  * This will usually be done immediately after a thread is created.
2564  * The caller of scheduleThread must create the thread using e.g.
2565  * createThread and push an appropriate closure
2566  * on this thread's stack before the scheduler is invoked.
2567  * ------------------------------------------------------------------------ */
2568
2569 void
2570 scheduleThread(Capability *cap, StgTSO *tso)
2571 {
2572     // The thread goes at the *end* of the run-queue, to avoid possible
2573     // starvation of any threads already on the queue.
2574     appendToRunQueue(cap,tso);
2575 }
2576
2577 Capability *
2578 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2579 {
2580     Task *task;
2581
2582     // We already created/initialised the Task
2583     task = cap->running_task;
2584
2585     // This TSO is now a bound thread; make the Task and TSO
2586     // point to each other.
2587     tso->bound = task;
2588
2589     task->tso = tso;
2590     task->ret = ret;
2591     task->stat = NoStatus;
2592
2593     appendToRunQueue(cap,tso);
2594
2595     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2596
2597 #if defined(GRAN)
2598     /* GranSim specific init */
2599     CurrentTSO = m->tso;                // the TSO to run
2600     procStatus[MainProc] = Busy;        // status of main PE
2601     CurrentProc = MainProc;             // PE to run it on
2602 #endif
2603
2604     cap = schedule(cap,task);
2605
2606     ASSERT(task->stat != NoStatus);
2607     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2608
2609     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2610     return cap;
2611 }
2612
2613 /* ----------------------------------------------------------------------------
2614  * Starting Tasks
2615  * ------------------------------------------------------------------------- */
2616
2617 #if defined(THREADED_RTS)
2618 void
2619 workerStart(Task *task)
2620 {
2621     Capability *cap;
2622
2623     // See startWorkerTask().
2624     ACQUIRE_LOCK(&task->lock);
2625     cap = task->cap;
2626     RELEASE_LOCK(&task->lock);
2627
2628     // set the thread-local pointer to the Task:
2629     taskEnter(task);
2630
2631     // schedule() runs without a lock.
2632     cap = schedule(cap,task);
2633
2634     // On exit from schedule(), we have a Capability.
2635     releaseCapability(cap);
2636     taskStop(task);
2637 }
2638 #endif
2639
2640 /* ---------------------------------------------------------------------------
2641  * initScheduler()
2642  *
2643  * Initialise the scheduler.  This resets all the queues - if the
2644  * queues contained any threads, they'll be garbage collected at the
2645  * next pass.
2646  *
2647  * ------------------------------------------------------------------------ */
2648
2649 void 
2650 initScheduler(void)
2651 {
2652 #if defined(GRAN)
2653   nat i;
2654   for (i=0; i<=MAX_PROC; i++) {
2655     run_queue_hds[i]      = END_TSO_QUEUE;
2656     run_queue_tls[i]      = END_TSO_QUEUE;
2657     blocked_queue_hds[i]  = END_TSO_QUEUE;
2658     blocked_queue_tls[i]  = END_TSO_QUEUE;
2659     ccalling_threadss[i]  = END_TSO_QUEUE;
2660     blackhole_queue[i]    = END_TSO_QUEUE;
2661     sleeping_queue        = END_TSO_QUEUE;
2662   }
2663 #elif !defined(THREADED_RTS)
2664   blocked_queue_hd  = END_TSO_QUEUE;
2665   blocked_queue_tl  = END_TSO_QUEUE;
2666   sleeping_queue    = END_TSO_QUEUE;
2667 #endif
2668
2669   blackhole_queue   = END_TSO_QUEUE;
2670   all_threads       = END_TSO_QUEUE;
2671
2672   context_switch = 0;
2673   interrupted    = 0;
2674
2675   RtsFlags.ConcFlags.ctxtSwitchTicks =
2676       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2677       
2678 #if defined(THREADED_RTS)
2679   /* Initialise the mutex and condition variables used by
2680    * the scheduler. */
2681   initMutex(&sched_mutex);
2682 #endif
2683   
2684   ACQUIRE_LOCK(&sched_mutex);
2685
2686   /* A capability holds the state a native thread needs in
2687    * order to execute STG code. At least one capability is
2688    * floating around (only SMP builds have more than one).
2689    */
2690   initCapabilities();
2691
2692   initTaskManager();
2693
2694 #if defined(SMP) || defined(PARALLEL_HASKELL)
2695   initSparkPools();
2696 #endif
2697
2698 #if defined(SMP)
2699   /*
2700    * Eagerly start one worker to run each Capability, except for
2701    * Capability 0.  The idea is that we're probably going to start a
2702    * bound thread on Capability 0 pretty soon, so we don't want a
2703    * worker task hogging it.
2704    */
2705   { 
2706       nat i;
2707       Capability *cap;
2708       for (i = 1; i < n_capabilities; i++) {
2709           cap = &capabilities[i];
2710           ACQUIRE_LOCK(&cap->lock);
2711           startWorkerTask(cap, workerStart);
2712           RELEASE_LOCK(&cap->lock);
2713       }
2714   }
2715 #endif
2716
2717   RELEASE_LOCK(&sched_mutex);
2718 }
2719
2720 void
2721 exitScheduler( void )
2722 {
2723     interrupted = rtsTrue;
2724     shutting_down_scheduler = rtsTrue;
2725
2726 #if defined(THREADED_RTS)
2727     { 
2728         Task *task;
2729         nat i;
2730         
2731         ACQUIRE_LOCK(&sched_mutex);
2732         task = newBoundTask();
2733         RELEASE_LOCK(&sched_mutex);
2734
2735         for (i = 0; i < n_capabilities; i++) {
2736             shutdownCapability(&capabilities[i], task);
2737         }
2738         boundTaskExiting(task);
2739         stopTaskManager();
2740     }
2741 #endif
2742 }
2743
2744 /* ---------------------------------------------------------------------------
2745    Where are the roots that we know about?
2746
2747         - all the threads on the runnable queue
2748         - all the threads on the blocked queue
2749         - all the threads on the sleeping queue
2750         - all the thread currently executing a _ccall_GC
2751         - all the "main threads"
2752      
2753    ------------------------------------------------------------------------ */
2754
2755 /* This has to be protected either by the scheduler monitor, or by the
2756         garbage collection monitor (probably the latter).
2757         KH @ 25/10/99
2758 */
2759
2760 void
2761 GetRoots( evac_fn evac )
2762 {
2763     nat i;
2764     Capability *cap;
2765     Task *task;
2766
2767 #if defined(GRAN)
2768     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2769         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2770             evac((StgClosure **)&run_queue_hds[i]);
2771         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2772             evac((StgClosure **)&run_queue_tls[i]);
2773         
2774         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2775             evac((StgClosure **)&blocked_queue_hds[i]);
2776         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2777             evac((StgClosure **)&blocked_queue_tls[i]);
2778         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2779             evac((StgClosure **)&ccalling_threads[i]);
2780     }
2781
2782     markEventQueue();
2783
2784 #else /* !GRAN */
2785
2786     for (i = 0; i < n_capabilities; i++) {
2787         cap = &capabilities[i];
2788         evac((StgClosure **)&cap->run_queue_hd);
2789         evac((StgClosure **)&cap->run_queue_tl);
2790         
2791         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2792              task=task->next) {
2793             evac((StgClosure **)&task->suspended_tso);
2794         }
2795     }
2796     
2797 #if !defined(THREADED_RTS)
2798     evac((StgClosure **)&blocked_queue_hd);
2799     evac((StgClosure **)&blocked_queue_tl);
2800     evac((StgClosure **)&sleeping_queue);
2801 #endif 
2802 #endif
2803
2804     evac((StgClosure **)&blackhole_queue);
2805
2806 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2807     markSparkQueue(evac);
2808 #endif
2809     
2810 #if defined(RTS_USER_SIGNALS)
2811     // mark the signal handlers (signals should be already blocked)
2812     markSignalHandlers(evac);
2813 #endif
2814 }
2815
2816 /* -----------------------------------------------------------------------------
2817    performGC
2818
2819    This is the interface to the garbage collector from Haskell land.
2820    We provide this so that external C code can allocate and garbage
2821    collect when called from Haskell via _ccall_GC.
2822
2823    It might be useful to provide an interface whereby the programmer
2824    can specify more roots (ToDo).
2825    
2826    This needs to be protected by the GC condition variable above.  KH.
2827    -------------------------------------------------------------------------- */
2828
2829 static void (*extra_roots)(evac_fn);
2830
2831 void
2832 performGC(void)
2833 {
2834 #ifdef THREADED_RTS
2835     // ToDo: we have to grab all the capabilities here.
2836     errorBelch("performGC not supported in threaded RTS (yet)");
2837     stg_exit(EXIT_FAILURE);
2838 #endif
2839     /* Obligated to hold this lock upon entry */
2840     GarbageCollect(GetRoots,rtsFalse);
2841 }
2842
2843 void
2844 performMajorGC(void)
2845 {
2846 #ifdef THREADED_RTS
2847     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2848     stg_exit(EXIT_FAILURE);
2849 #endif
2850     GarbageCollect(GetRoots,rtsTrue);
2851 }
2852
2853 static void
2854 AllRoots(evac_fn evac)
2855 {
2856     GetRoots(evac);             // the scheduler's roots
2857     extra_roots(evac);          // the user's roots
2858 }
2859
2860 void
2861 performGCWithRoots(void (*get_roots)(evac_fn))
2862 {
2863 #ifdef THREADED_RTS
2864     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2865     stg_exit(EXIT_FAILURE);
2866 #endif
2867     extra_roots = get_roots;
2868     GarbageCollect(AllRoots,rtsFalse);
2869 }
2870
2871 /* -----------------------------------------------------------------------------
2872    Stack overflow
2873
2874    If the thread has reached its maximum stack size, then raise the
2875    StackOverflow exception in the offending thread.  Otherwise
2876    relocate the TSO into a larger chunk of memory and adjust its stack
2877    size appropriately.
2878    -------------------------------------------------------------------------- */
2879
2880 static StgTSO *
2881 threadStackOverflow(Capability *cap, StgTSO *tso)
2882 {
2883   nat new_stack_size, stack_words;
2884   lnat new_tso_size;
2885   StgPtr new_sp;
2886   StgTSO *dest;
2887
2888   IF_DEBUG(sanity,checkTSO(tso));
2889   if (tso->stack_size >= tso->max_stack_size) {
2890
2891     IF_DEBUG(gc,
2892              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2893                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2894              /* If we're debugging, just print out the top of the stack */
2895              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2896                                               tso->sp+64)));
2897
2898     /* Send this thread the StackOverflow exception */
2899     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2900     return tso;
2901   }
2902
2903   /* Try to double the current stack size.  If that takes us over the
2904    * maximum stack size for this thread, then use the maximum instead.
2905    * Finally round up so the TSO ends up as a whole number of blocks.
2906    */
2907   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2908   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2909                                        TSO_STRUCT_SIZE)/sizeof(W_);
2910   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2911   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2912
2913   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2914
2915   dest = (StgTSO *)allocate(new_tso_size);
2916   TICK_ALLOC_TSO(new_stack_size,0);
2917
2918   /* copy the TSO block and the old stack into the new area */
2919   memcpy(dest,tso,TSO_STRUCT_SIZE);
2920   stack_words = tso->stack + tso->stack_size - tso->sp;
2921   new_sp = (P_)dest + new_tso_size - stack_words;
2922   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2923
2924   /* relocate the stack pointers... */
2925   dest->sp         = new_sp;
2926   dest->stack_size = new_stack_size;
2927         
2928   /* Mark the old TSO as relocated.  We have to check for relocated
2929    * TSOs in the garbage collector and any primops that deal with TSOs.
2930    *
2931    * It's important to set the sp value to just beyond the end
2932    * of the stack, so we don't attempt to scavenge any part of the
2933    * dead TSO's stack.
2934    */
2935   tso->what_next = ThreadRelocated;
2936   tso->link = dest;
2937   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2938   tso->why_blocked = NotBlocked;
2939
2940   IF_PAR_DEBUG(verbose,
2941                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2942                      tso->id, tso, tso->stack_size);
2943                /* If we're debugging, just print out the top of the stack */
2944                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2945                                                 tso->sp+64)));
2946   
2947   IF_DEBUG(sanity,checkTSO(tso));
2948 #if 0
2949   IF_DEBUG(scheduler,printTSO(dest));
2950 #endif
2951
2952   return dest;
2953 }
2954
2955 /* ---------------------------------------------------------------------------
2956    Wake up a queue that was blocked on some resource.
2957    ------------------------------------------------------------------------ */
2958
2959 #if defined(GRAN)
2960 STATIC_INLINE void
2961 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2962 {
2963 }
2964 #elif defined(PARALLEL_HASKELL)
2965 STATIC_INLINE void
2966 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2967 {
2968   /* write RESUME events to log file and
2969      update blocked and fetch time (depending on type of the orig closure) */
2970   if (RtsFlags.ParFlags.ParStats.Full) {
2971     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2972                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2973                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2974     if (emptyRunQueue())
2975       emitSchedule = rtsTrue;
2976
2977     switch (get_itbl(node)->type) {
2978         case FETCH_ME_BQ:
2979           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2980           break;
2981         case RBH:
2982         case FETCH_ME:
2983         case BLACKHOLE_BQ:
2984           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2985           break;
2986 #ifdef DIST
2987         case MVAR:
2988           break;
2989 #endif    
2990         default:
2991           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2992         }
2993       }
2994 }
2995 #endif
2996
2997 #if defined(GRAN)
2998 StgBlockingQueueElement *
2999 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3000 {
3001     StgTSO *tso;
3002     PEs node_loc, tso_loc;
3003
3004     node_loc = where_is(node); // should be lifted out of loop
3005     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3006     tso_loc = where_is((StgClosure *)tso);
3007     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3008       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3009       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3010       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3011       // insertThread(tso, node_loc);
3012       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3013                 ResumeThread,
3014                 tso, node, (rtsSpark*)NULL);
3015       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3016       // len_local++;
3017       // len++;
3018     } else { // TSO is remote (actually should be FMBQ)
3019       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3020                                   RtsFlags.GranFlags.Costs.gunblocktime +
3021                                   RtsFlags.GranFlags.Costs.latency;
3022       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3023                 UnblockThread,
3024                 tso, node, (rtsSpark*)NULL);
3025       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3026       // len++;
3027     }
3028     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3029     IF_GRAN_DEBUG(bq,
3030                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3031                           (node_loc==tso_loc ? "Local" : "Global"), 
3032                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3033     tso->block_info.closure = NULL;
3034     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3035                              tso->id, tso));
3036 }
3037 #elif defined(PARALLEL_HASKELL)
3038 StgBlockingQueueElement *
3039 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3040 {
3041     StgBlockingQueueElement *next;
3042
3043     switch (get_itbl(bqe)->type) {
3044     case TSO:
3045       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3046       /* if it's a TSO just push it onto the run_queue */
3047       next = bqe->link;
3048       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3049       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3050       threadRunnable();
3051       unblockCount(bqe, node);
3052       /* reset blocking status after dumping event */
3053       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3054       break;
3055
3056     case BLOCKED_FETCH:
3057       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3058       next = bqe->link;
3059       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3060       PendingFetches = (StgBlockedFetch *)bqe;
3061       break;
3062
3063 # if defined(DEBUG)
3064       /* can ignore this case in a non-debugging setup; 
3065          see comments on RBHSave closures above */
3066     case CONSTR:
3067       /* check that the closure is an RBHSave closure */
3068       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3069              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3070              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3071       break;
3072
3073     default:
3074       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3075            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3076            (StgClosure *)bqe);
3077 # endif
3078     }
3079   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3080   return next;
3081 }
3082 #endif
3083
3084 StgTSO *
3085 unblockOne(Capability *cap, StgTSO *tso)
3086 {
3087   StgTSO *next;
3088
3089   ASSERT(get_itbl(tso)->type == TSO);
3090   ASSERT(tso->why_blocked != NotBlocked);
3091   tso->why_blocked = NotBlocked;
3092   next = tso->link;
3093   tso->link = END_TSO_QUEUE;
3094
3095   // We might have just migrated this TSO to our Capability:
3096   if (tso->bound) {
3097       tso->bound->cap = cap;
3098   }
3099
3100   appendToRunQueue(cap,tso);
3101
3102   // we're holding a newly woken thread, make sure we context switch
3103   // quickly so we can migrate it if necessary.
3104   context_switch = 1;
3105   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3106   return next;
3107 }
3108
3109
3110 #if defined(GRAN)
3111 void 
3112 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3113 {
3114   StgBlockingQueueElement *bqe;
3115   PEs node_loc;
3116   nat len = 0; 
3117
3118   IF_GRAN_DEBUG(bq, 
3119                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3120                       node, CurrentProc, CurrentTime[CurrentProc], 
3121                       CurrentTSO->id, CurrentTSO));
3122
3123   node_loc = where_is(node);
3124
3125   ASSERT(q == END_BQ_QUEUE ||
3126          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3127          get_itbl(q)->type == CONSTR); // closure (type constructor)
3128   ASSERT(is_unique(node));
3129
3130   /* FAKE FETCH: magically copy the node to the tso's proc;
3131      no Fetch necessary because in reality the node should not have been 
3132      moved to the other PE in the first place
3133   */
3134   if (CurrentProc!=node_loc) {
3135     IF_GRAN_DEBUG(bq, 
3136                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3137                         node, node_loc, CurrentProc, CurrentTSO->id, 
3138                         // CurrentTSO, where_is(CurrentTSO),
3139                         node->header.gran.procs));
3140     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3141     IF_GRAN_DEBUG(bq, 
3142                   debugBelch("## new bitmask of node %p is %#x\n",
3143                         node, node->header.gran.procs));
3144     if (RtsFlags.GranFlags.GranSimStats.Global) {
3145       globalGranStats.tot_fake_fetches++;
3146     }
3147   }
3148
3149   bqe = q;
3150   // ToDo: check: ASSERT(CurrentProc==node_loc);
3151   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3152     //next = bqe->link;
3153     /* 
3154        bqe points to the current element in the queue
3155        next points to the next element in the queue
3156     */
3157     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3158     //tso_loc = where_is(tso);
3159     len++;
3160     bqe = unblockOne(bqe, node);
3161   }
3162
3163   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3164      the closure to make room for the anchor of the BQ */
3165   if (bqe!=END_BQ_QUEUE) {
3166     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3167     /*
3168     ASSERT((info_ptr==&RBH_Save_0_info) ||
3169            (info_ptr==&RBH_Save_1_info) ||
3170            (info_ptr==&RBH_Save_2_info));
3171     */
3172     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3173     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3174     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3175
3176     IF_GRAN_DEBUG(bq,
3177                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3178                         node, info_type(node)));
3179   }
3180
3181   /* statistics gathering */
3182   if (RtsFlags.GranFlags.GranSimStats.Global) {
3183     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3184     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3185     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3186     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3187   }
3188   IF_GRAN_DEBUG(bq,
3189                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3190                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3191 }
3192 #elif defined(PARALLEL_HASKELL)
3193 void 
3194 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3195 {
3196   StgBlockingQueueElement *bqe;
3197
3198   IF_PAR_DEBUG(verbose, 
3199                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3200                      node, mytid));
3201 #ifdef DIST  
3202   //RFP
3203   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3204     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3205     return;
3206   }
3207 #endif
3208   
3209   ASSERT(q == END_BQ_QUEUE ||
3210          get_itbl(q)->type == TSO ||           
3211          get_itbl(q)->type == BLOCKED_FETCH || 
3212          get_itbl(q)->type == CONSTR); 
3213
3214   bqe = q;
3215   while (get_itbl(bqe)->type==TSO || 
3216          get_itbl(bqe)->type==BLOCKED_FETCH) {
3217     bqe = unblockOne(bqe, node);
3218   }
3219 }
3220
3221 #else   /* !GRAN && !PARALLEL_HASKELL */
3222
3223 void
3224 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3225 {
3226     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3227                              // Exception.cmm
3228     while (tso != END_TSO_QUEUE) {
3229         tso = unblockOne(cap,tso);
3230     }
3231 }
3232 #endif
3233
3234 /* ---------------------------------------------------------------------------
3235    Interrupt execution
3236    - usually called inside a signal handler so it mustn't do anything fancy.   
3237    ------------------------------------------------------------------------ */
3238
3239 void
3240 interruptStgRts(void)
3241 {
3242     interrupted    = 1;
3243     context_switch = 1;
3244 #if defined(THREADED_RTS)
3245     prodAllCapabilities();
3246 #endif
3247 }
3248
3249 /* -----------------------------------------------------------------------------
3250    Unblock a thread
3251
3252    This is for use when we raise an exception in another thread, which
3253    may be blocked.
3254    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3255    -------------------------------------------------------------------------- */
3256
3257 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3258 /*
3259   NB: only the type of the blocking queue is different in GranSim and GUM
3260       the operations on the queue-elements are the same
3261       long live polymorphism!
3262
3263   Locks: sched_mutex is held upon entry and exit.
3264
3265 */
3266 static void
3267 unblockThread(Capability *cap, StgTSO *tso)
3268 {
3269   StgBlockingQueueElement *t, **last;
3270
3271   switch (tso->why_blocked) {
3272
3273   case NotBlocked:
3274     return;  /* not blocked */
3275
3276   case BlockedOnSTM:
3277     // Be careful: nothing to do here!  We tell the scheduler that the thread
3278     // is runnable and we leave it to the stack-walking code to abort the 
3279     // transaction while unwinding the stack.  We should perhaps have a debugging
3280     // test to make sure that this really happens and that the 'zombie' transaction
3281     // does not get committed.
3282     goto done;
3283
3284   case BlockedOnMVar:
3285     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3286     {
3287       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3288       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3289
3290       last = (StgBlockingQueueElement **)&mvar->head;
3291       for (t = (StgBlockingQueueElement *)mvar->head; 
3292            t != END_BQ_QUEUE; 
3293            last = &t->link, last_tso = t, t = t->link) {
3294         if (t == (StgBlockingQueueElement *)tso) {
3295           *last = (StgBlockingQueueElement *)tso->link;
3296           if (mvar->tail == tso) {
3297             mvar->tail = (StgTSO *)last_tso;
3298           }
3299           goto done;
3300         }
3301       }
3302       barf("unblockThread (MVAR): TSO not found");
3303     }
3304
3305   case BlockedOnBlackHole:
3306     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3307     {
3308       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3309
3310       last = &bq->blocking_queue;
3311       for (t = bq->blocking_queue; 
3312            t != END_BQ_QUEUE; 
3313            last = &t->link, t = t->link) {
3314         if (t == (StgBlockingQueueElement *)tso) {
3315           *last = (StgBlockingQueueElement *)tso->link;
3316           goto done;
3317         }
3318       }
3319       barf("unblockThread (BLACKHOLE): TSO not found");
3320     }
3321
3322   case BlockedOnException:
3323     {
3324       StgTSO *target  = tso->block_info.tso;
3325
3326       ASSERT(get_itbl(target)->type == TSO);
3327
3328       if (target->what_next == ThreadRelocated) {
3329           target = target->link;
3330           ASSERT(get_itbl(target)->type == TSO);
3331       }
3332
3333       ASSERT(target->blocked_exceptions != NULL);
3334
3335       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3336       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3337            t != END_BQ_QUEUE; 
3338            last = &t->link, t = t->link) {
3339         ASSERT(get_itbl(t)->type == TSO);
3340         if (t == (StgBlockingQueueElement *)tso) {
3341           *last = (StgBlockingQueueElement *)tso->link;
3342           goto done;
3343         }
3344       }
3345       barf("unblockThread (Exception): TSO not found");
3346     }
3347
3348   case BlockedOnRead:
3349   case BlockedOnWrite:
3350 #if defined(mingw32_HOST_OS)
3351   case BlockedOnDoProc:
3352 #endif
3353     {
3354       /* take TSO off blocked_queue */
3355       StgBlockingQueueElement *prev = NULL;
3356       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3357            prev = t, t = t->link) {
3358         if (t == (StgBlockingQueueElement *)tso) {
3359           if (prev == NULL) {
3360             blocked_queue_hd = (StgTSO *)t->link;
3361             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3362               blocked_queue_tl = END_TSO_QUEUE;
3363             }
3364           } else {
3365             prev->link = t->link;
3366             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3367               blocked_queue_tl = (StgTSO *)prev;
3368             }
3369           }
3370 #if defined(mingw32_HOST_OS)
3371           /* (Cooperatively) signal that the worker thread should abort
3372            * the request.
3373            */
3374           abandonWorkRequest(tso->block_info.async_result->reqID);
3375 #endif
3376           goto done;
3377         }
3378       }
3379       barf("unblockThread (I/O): TSO not found");
3380     }
3381
3382   case BlockedOnDelay:
3383     {
3384       /* take TSO off sleeping_queue */
3385       StgBlockingQueueElement *prev = NULL;
3386       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3387            prev = t, t = t->link) {
3388         if (t == (StgBlockingQueueElement *)tso) {
3389           if (prev == NULL) {
3390             sleeping_queue = (StgTSO *)t->link;
3391           } else {
3392             prev->link = t->link;
3393           }
3394           goto done;
3395         }
3396       }
3397       barf("unblockThread (delay): TSO not found");
3398     }
3399
3400   default:
3401     barf("unblockThread");
3402   }
3403
3404  done:
3405   tso->link = END_TSO_QUEUE;
3406   tso->why_blocked = NotBlocked;
3407   tso->block_info.closure = NULL;
3408   pushOnRunQueue(cap,tso);
3409 }
3410 #else
3411 static void
3412 unblockThread(Capability *cap, StgTSO *tso)
3413 {
3414   StgTSO *t, **last;
3415   
3416   /* To avoid locking unnecessarily. */
3417   if (tso->why_blocked == NotBlocked) {
3418     return;
3419   }
3420
3421   switch (tso->why_blocked) {
3422
3423   case BlockedOnSTM:
3424     // Be careful: nothing to do here!  We tell the scheduler that the thread
3425     // is runnable and we leave it to the stack-walking code to abort the 
3426     // transaction while unwinding the stack.  We should perhaps have a debugging
3427     // test to make sure that this really happens and that the 'zombie' transaction
3428     // does not get committed.
3429     goto done;
3430
3431   case BlockedOnMVar:
3432     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3433     {
3434       StgTSO *last_tso = END_TSO_QUEUE;
3435       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3436
3437       last = &mvar->head;
3438       for (t = mvar->head; t != END_TSO_QUEUE; 
3439            last = &t->link, last_tso = t, t = t->link) {
3440         if (t == tso) {
3441           *last = tso->link;
3442           if (mvar->tail == tso) {
3443             mvar->tail = last_tso;
3444           }
3445           goto done;
3446         }
3447       }
3448       barf("unblockThread (MVAR): TSO not found");
3449     }
3450
3451   case BlockedOnBlackHole:
3452     {
3453       last = &blackhole_queue;
3454       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3455            last = &t->link, t = t->link) {
3456         if (t == tso) {
3457           *last = tso->link;
3458           goto done;
3459         }
3460       }
3461       barf("unblockThread (BLACKHOLE): TSO not found");
3462     }
3463
3464   case BlockedOnException:
3465     {
3466       StgTSO *target  = tso->block_info.tso;
3467
3468       ASSERT(get_itbl(target)->type == TSO);
3469
3470       while (target->what_next == ThreadRelocated) {
3471           target = target->link;
3472           ASSERT(get_itbl(target)->type == TSO);
3473       }
3474       
3475       ASSERT(target->blocked_exceptions != NULL);
3476
3477       last = &target->blocked_exceptions;
3478       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3479            last = &t->link, t = t->link) {
3480         ASSERT(get_itbl(t)->type == TSO);
3481         if (t == tso) {
3482           *last = tso->link;
3483           goto done;
3484         }
3485       }
3486       barf("unblockThread (Exception): TSO not found");
3487     }
3488
3489 #if !defined(THREADED_RTS)
3490   case BlockedOnRead:
3491   case BlockedOnWrite:
3492 #if defined(mingw32_HOST_OS)
3493   case BlockedOnDoProc:
3494 #endif
3495     {
3496       StgTSO *prev = NULL;
3497       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3498            prev = t, t = t->link) {
3499         if (t == tso) {
3500           if (prev == NULL) {
3501             blocked_queue_hd = t->link;
3502             if (blocked_queue_tl == t) {
3503               blocked_queue_tl = END_TSO_QUEUE;
3504             }
3505           } else {
3506             prev->link = t->link;
3507             if (blocked_queue_tl == t) {
3508               blocked_queue_tl = prev;
3509             }
3510           }
3511 #if defined(mingw32_HOST_OS)
3512           /* (Cooperatively) signal that the worker thread should abort
3513            * the request.
3514            */
3515           abandonWorkRequest(tso->block_info.async_result->reqID);
3516 #endif
3517           goto done;
3518         }
3519       }
3520       barf("unblockThread (I/O): TSO not found");
3521     }
3522
3523   case BlockedOnDelay:
3524     {
3525       StgTSO *prev = NULL;
3526       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3527            prev = t, t = t->link) {
3528         if (t == tso) {
3529           if (prev == NULL) {
3530             sleeping_queue = t->link;
3531           } else {
3532             prev->link = t->link;
3533           }
3534           goto done;
3535         }
3536       }
3537       barf("unblockThread (delay): TSO not found");
3538     }
3539 #endif
3540
3541   default:
3542     barf("unblockThread");
3543   }
3544
3545  done:
3546   tso->link = END_TSO_QUEUE;
3547   tso->why_blocked = NotBlocked;
3548   tso->block_info.closure = NULL;
3549   appendToRunQueue(cap,tso);
3550 }
3551 #endif
3552
3553 /* -----------------------------------------------------------------------------
3554  * checkBlackHoles()
3555  *
3556  * Check the blackhole_queue for threads that can be woken up.  We do
3557  * this periodically: before every GC, and whenever the run queue is
3558  * empty.
3559  *
3560  * An elegant solution might be to just wake up all the blocked
3561  * threads with awakenBlockedQueue occasionally: they'll go back to
3562  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3563  * doesn't give us a way to tell whether we've actually managed to
3564  * wake up any threads, so we would be busy-waiting.
3565  *
3566  * -------------------------------------------------------------------------- */
3567
3568 static rtsBool
3569 checkBlackHoles (Capability *cap)
3570 {
3571     StgTSO **prev, *t;
3572     rtsBool any_woke_up = rtsFalse;
3573     StgHalfWord type;
3574
3575     // blackhole_queue is global:
3576     ASSERT_LOCK_HELD(&sched_mutex);
3577
3578     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3579
3580     // ASSUMES: sched_mutex
3581     prev = &blackhole_queue;
3582     t = blackhole_queue;
3583     while (t != END_TSO_QUEUE) {
3584         ASSERT(t->why_blocked == BlockedOnBlackHole);
3585         type = get_itbl(t->block_info.closure)->type;
3586         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3587             IF_DEBUG(sanity,checkTSO(t));
3588             t = unblockOne(cap, t);
3589             // urk, the threads migrate to the current capability
3590             // here, but we'd like to keep them on the original one.
3591             *prev = t;
3592             any_woke_up = rtsTrue;
3593         } else {
3594             prev = &t->link;
3595             t = t->link;
3596         }
3597     }
3598
3599     return any_woke_up;
3600 }
3601
3602 /* -----------------------------------------------------------------------------
3603  * raiseAsync()
3604  *
3605  * The following function implements the magic for raising an
3606  * asynchronous exception in an existing thread.
3607  *
3608  * We first remove the thread from any queue on which it might be
3609  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3610  *
3611  * We strip the stack down to the innermost CATCH_FRAME, building
3612  * thunks in the heap for all the active computations, so they can 
3613  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3614  * an application of the handler to the exception, and push it on
3615  * the top of the stack.
3616  * 
3617  * How exactly do we save all the active computations?  We create an
3618  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3619  * AP_STACKs pushes everything from the corresponding update frame
3620  * upwards onto the stack.  (Actually, it pushes everything up to the
3621  * next update frame plus a pointer to the next AP_STACK object.
3622  * Entering the next AP_STACK object pushes more onto the stack until we
3623  * reach the last AP_STACK object - at which point the stack should look
3624  * exactly as it did when we killed the TSO and we can continue
3625  * execution by entering the closure on top of the stack.
3626  *
3627  * We can also kill a thread entirely - this happens if either (a) the 
3628  * exception passed to raiseAsync is NULL, or (b) there's no
3629  * CATCH_FRAME on the stack.  In either case, we strip the entire
3630  * stack and replace the thread with a zombie.
3631  *
3632  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3633  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3634  * the TSO is currently blocked on or on the run queue of.
3635  *
3636  * -------------------------------------------------------------------------- */
3637  
3638 void
3639 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3640 {
3641     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3642 }
3643
3644 void
3645 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3646 {
3647     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3648 }
3649
3650 static void
3651 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3652             rtsBool stop_at_atomically, StgPtr stop_here)
3653 {
3654     StgRetInfoTable *info;
3655     StgPtr sp, frame;
3656     nat i;
3657   
3658     // Thread already dead?
3659     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3660         return;
3661     }
3662
3663     IF_DEBUG(scheduler, 
3664              sched_belch("raising exception in thread %ld.", (long)tso->id));
3665     
3666     // Remove it from any blocking queues
3667     unblockThread(cap,tso);
3668
3669     sp = tso->sp;
3670     
3671     // The stack freezing code assumes there's a closure pointer on
3672     // the top of the stack, so we have to arrange that this is the case...
3673     //
3674     if (sp[0] == (W_)&stg_enter_info) {
3675         sp++;
3676     } else {
3677         sp--;
3678         sp[0] = (W_)&stg_dummy_ret_closure;
3679     }
3680
3681     frame = sp + 1;
3682     while (stop_here == NULL || frame < stop_here) {
3683
3684         // 1. Let the top of the stack be the "current closure"
3685         //
3686         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3687         // CATCH_FRAME.
3688         //
3689         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3690         // current closure applied to the chunk of stack up to (but not
3691         // including) the update frame.  This closure becomes the "current
3692         // closure".  Go back to step 2.
3693         //
3694         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3695         // top of the stack applied to the exception.
3696         // 
3697         // 5. If it's a STOP_FRAME, then kill the thread.
3698         // 
3699         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3700         // transaction
3701        
3702         info = get_ret_itbl((StgClosure *)frame);
3703
3704         switch (info->i.type) {
3705
3706         case UPDATE_FRAME:
3707         {
3708             StgAP_STACK * ap;
3709             nat words;
3710             
3711             // First build an AP_STACK consisting of the stack chunk above the
3712             // current update frame, with the top word on the stack as the
3713             // fun field.
3714             //
3715             words = frame - sp - 1;
3716             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3717             
3718             ap->size = words;
3719             ap->fun  = (StgClosure *)sp[0];
3720             sp++;
3721             for(i=0; i < (nat)words; ++i) {
3722                 ap->payload[i] = (StgClosure *)*sp++;
3723             }
3724             
3725             SET_HDR(ap,&stg_AP_STACK_info,
3726                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3727             TICK_ALLOC_UP_THK(words+1,0);
3728             
3729             IF_DEBUG(scheduler,
3730                      debugBelch("sched: Updating ");
3731                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3732                      debugBelch(" with ");
3733                      printObj((StgClosure *)ap);
3734                 );
3735
3736             // Replace the updatee with an indirection
3737             //
3738             // Warning: if we're in a loop, more than one update frame on
3739             // the stack may point to the same object.  Be careful not to
3740             // overwrite an IND_OLDGEN in this case, because we'll screw
3741             // up the mutable lists.  To be on the safe side, don't
3742             // overwrite any kind of indirection at all.  See also
3743             // threadSqueezeStack in GC.c, where we have to make a similar
3744             // check.
3745             //
3746             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3747                 // revert the black hole
3748                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3749                                (StgClosure *)ap);
3750             }
3751             sp += sizeofW(StgUpdateFrame) - 1;
3752             sp[0] = (W_)ap; // push onto stack
3753             frame = sp + 1;
3754             continue; //no need to bump frame
3755         }
3756
3757         case STOP_FRAME:
3758             // We've stripped the entire stack, the thread is now dead.
3759             tso->what_next = ThreadKilled;
3760             tso->sp = frame + sizeofW(StgStopFrame);
3761             return;
3762
3763         case CATCH_FRAME:
3764             // If we find a CATCH_FRAME, and we've got an exception to raise,
3765             // then build the THUNK raise(exception), and leave it on
3766             // top of the CATCH_FRAME ready to enter.
3767             //
3768         {
3769 #ifdef PROFILING
3770             StgCatchFrame *cf = (StgCatchFrame *)frame;
3771 #endif
3772             StgThunk *raise;
3773             
3774             if (exception == NULL) break;
3775
3776             // we've got an exception to raise, so let's pass it to the
3777             // handler in this frame.
3778             //
3779             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3780             TICK_ALLOC_SE_THK(1,0);
3781             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3782             raise->payload[0] = exception;
3783             
3784             // throw away the stack from Sp up to the CATCH_FRAME.
3785             //
3786             sp = frame - 1;
3787             
3788             /* Ensure that async excpetions are blocked now, so we don't get
3789              * a surprise exception before we get around to executing the
3790              * handler.
3791              */
3792             if (tso->blocked_exceptions == NULL) {
3793                 tso->blocked_exceptions = END_TSO_QUEUE;
3794             }
3795
3796             /* Put the newly-built THUNK on top of the stack, ready to execute
3797              * when the thread restarts.
3798              */
3799             sp[0] = (W_)raise;
3800             sp[-1] = (W_)&stg_enter_info;
3801             tso->sp = sp-1;
3802             tso->what_next = ThreadRunGHC;
3803             IF_DEBUG(sanity, checkTSO(tso));
3804             return;
3805         }
3806             
3807         case ATOMICALLY_FRAME:
3808             if (stop_at_atomically) {
3809                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3810                 stmCondemnTransaction(cap, tso -> trec);
3811 #ifdef REG_R1
3812                 tso->sp = frame;
3813 #else
3814                 // R1 is not a register: the return convention for IO in
3815                 // this case puts the return value on the stack, so we
3816                 // need to set up the stack to return to the atomically
3817                 // frame properly...
3818                 tso->sp = frame - 2;
3819                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3820                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3821 #endif
3822                 tso->what_next = ThreadRunGHC;
3823                 return;
3824             }
3825             // Not stop_at_atomically... fall through and abort the
3826             // transaction.
3827             
3828         case CATCH_RETRY_FRAME:
3829             // IF we find an ATOMICALLY_FRAME then we abort the
3830             // current transaction and propagate the exception.  In
3831             // this case (unlike ordinary exceptions) we do not care
3832             // whether the transaction is valid or not because its
3833             // possible validity cannot have caused the exception
3834             // and will not be visible after the abort.
3835             IF_DEBUG(stm,
3836                      debugBelch("Found atomically block delivering async exception\n"));
3837             StgTRecHeader *trec = tso -> trec;
3838             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3839             stmAbortTransaction(cap, trec);
3840             tso -> trec = outer;
3841             break;
3842             
3843         default:
3844             break;
3845         }
3846
3847         // move on to the next stack frame
3848         frame += stack_frame_sizeW((StgClosure *)frame);
3849     }
3850
3851     // if we got here, then we stopped at stop_here
3852     ASSERT(stop_here != NULL);
3853 }
3854
3855 /* -----------------------------------------------------------------------------
3856    Deleting threads
3857
3858    This is used for interruption (^C) and forking, and corresponds to
3859    raising an exception but without letting the thread catch the
3860    exception.
3861    -------------------------------------------------------------------------- */
3862
3863 static void 
3864 deleteThread (Capability *cap, StgTSO *tso)
3865 {
3866   if (tso->why_blocked != BlockedOnCCall &&
3867       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3868       raiseAsync(cap,tso,NULL);
3869   }
3870 }
3871
3872 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3873 static void 
3874 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3875 { // for forkProcess only:
3876   // delete thread without giving it a chance to catch the KillThread exception
3877
3878   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3879       return;
3880   }
3881
3882   if (tso->why_blocked != BlockedOnCCall &&
3883       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3884       unblockThread(cap,tso);
3885   }
3886
3887   tso->what_next = ThreadKilled;
3888 }
3889 #endif
3890
3891 /* -----------------------------------------------------------------------------
3892    raiseExceptionHelper
3893    
3894    This function is called by the raise# primitve, just so that we can
3895    move some of the tricky bits of raising an exception from C-- into
3896    C.  Who knows, it might be a useful re-useable thing here too.
3897    -------------------------------------------------------------------------- */
3898
3899 StgWord
3900 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3901 {
3902     Capability *cap = regTableToCapability(reg);
3903     StgThunk *raise_closure = NULL;
3904     StgPtr p, next;
3905     StgRetInfoTable *info;
3906     //
3907     // This closure represents the expression 'raise# E' where E
3908     // is the exception raise.  It is used to overwrite all the
3909     // thunks which are currently under evaluataion.
3910     //
3911
3912     //    
3913     // LDV profiling: stg_raise_info has THUNK as its closure
3914     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3915     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3916     // 1 does not cause any problem unless profiling is performed.
3917     // However, when LDV profiling goes on, we need to linearly scan
3918     // small object pool, where raise_closure is stored, so we should
3919     // use MIN_UPD_SIZE.
3920     //
3921     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3922     //                                 sizeofW(StgClosure)+1);
3923     //
3924
3925     //
3926     // Walk up the stack, looking for the catch frame.  On the way,
3927     // we update any closures pointed to from update frames with the
3928     // raise closure that we just built.
3929     //
3930     p = tso->sp;
3931     while(1) {
3932         info = get_ret_itbl((StgClosure *)p);
3933         next = p + stack_frame_sizeW((StgClosure *)p);
3934         switch (info->i.type) {
3935             
3936         case UPDATE_FRAME:
3937             // Only create raise_closure if we need to.
3938             if (raise_closure == NULL) {
3939                 raise_closure = 
3940                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3941                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3942                 raise_closure->payload[0] = exception;
3943             }
3944             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3945             p = next;
3946             continue;
3947
3948         case ATOMICALLY_FRAME:
3949             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3950             tso->sp = p;
3951             return ATOMICALLY_FRAME;
3952             
3953         case CATCH_FRAME:
3954             tso->sp = p;
3955             return CATCH_FRAME;
3956
3957         case CATCH_STM_FRAME:
3958             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3959             tso->sp = p;
3960             return CATCH_STM_FRAME;
3961             
3962         case STOP_FRAME:
3963             tso->sp = p;
3964             return STOP_FRAME;
3965
3966         case CATCH_RETRY_FRAME:
3967         default:
3968             p = next; 
3969             continue;
3970         }
3971     }
3972 }
3973
3974
3975 /* -----------------------------------------------------------------------------
3976    findRetryFrameHelper
3977
3978    This function is called by the retry# primitive.  It traverses the stack
3979    leaving tso->sp referring to the frame which should handle the retry.  
3980
3981    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3982    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3983
3984    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3985    despite the similar implementation.
3986
3987    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3988    not be created within memory transactions.
3989    -------------------------------------------------------------------------- */
3990
3991 StgWord
3992 findRetryFrameHelper (StgTSO *tso)
3993 {
3994   StgPtr           p, next;
3995   StgRetInfoTable *info;
3996
3997   p = tso -> sp;
3998   while (1) {
3999     info = get_ret_itbl((StgClosure *)p);
4000     next = p + stack_frame_sizeW((StgClosure *)p);
4001     switch (info->i.type) {
4002       
4003     case ATOMICALLY_FRAME:
4004       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4005       tso->sp = p;
4006       return ATOMICALLY_FRAME;
4007       
4008     case CATCH_RETRY_FRAME:
4009       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4010       tso->sp = p;
4011       return CATCH_RETRY_FRAME;
4012       
4013     case CATCH_STM_FRAME:
4014     default:
4015       ASSERT(info->i.type != CATCH_FRAME);
4016       ASSERT(info->i.type != STOP_FRAME);
4017       p = next; 
4018       continue;
4019     }
4020   }
4021 }
4022
4023 /* -----------------------------------------------------------------------------
4024    resurrectThreads is called after garbage collection on the list of
4025    threads found to be garbage.  Each of these threads will be woken
4026    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4027    on an MVar, or NonTermination if the thread was blocked on a Black
4028    Hole.
4029
4030    Locks: assumes we hold *all* the capabilities.
4031    -------------------------------------------------------------------------- */
4032
4033 void
4034 resurrectThreads (StgTSO *threads)
4035 {
4036     StgTSO *tso, *next;
4037     Capability *cap;
4038
4039     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4040         next = tso->global_link;
4041         tso->global_link = all_threads;
4042         all_threads = tso;
4043         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4044         
4045         // Wake up the thread on the Capability it was last on for a
4046         // bound thread, or last_free_capability otherwise.
4047         if (tso->bound) {
4048             cap = tso->bound->cap;
4049         } else {
4050             cap = last_free_capability;
4051         }
4052         
4053         switch (tso->why_blocked) {
4054         case BlockedOnMVar:
4055         case BlockedOnException:
4056             /* Called by GC - sched_mutex lock is currently held. */
4057             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4058             break;
4059         case BlockedOnBlackHole:
4060             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4061             break;
4062         case BlockedOnSTM:
4063             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4064             break;
4065         case NotBlocked:
4066             /* This might happen if the thread was blocked on a black hole
4067              * belonging to a thread that we've just woken up (raiseAsync
4068              * can wake up threads, remember...).
4069              */
4070             continue;
4071         default:
4072             barf("resurrectThreads: thread blocked in a strange way");
4073         }
4074     }
4075 }
4076
4077 /* ----------------------------------------------------------------------------
4078  * Debugging: why is a thread blocked
4079  * [Also provides useful information when debugging threaded programs
4080  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4081    ------------------------------------------------------------------------- */
4082
4083 #if DEBUG
4084 static void
4085 printThreadBlockage(StgTSO *tso)
4086 {
4087   switch (tso->why_blocked) {
4088   case BlockedOnRead:
4089     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4090     break;
4091   case BlockedOnWrite:
4092     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4093     break;
4094 #if defined(mingw32_HOST_OS)
4095     case BlockedOnDoProc:
4096     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4097     break;
4098 #endif
4099   case BlockedOnDelay:
4100     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4101     break;
4102   case BlockedOnMVar:
4103     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4104     break;
4105   case BlockedOnException:
4106     debugBelch("is blocked on delivering an exception to thread %d",
4107             tso->block_info.tso->id);
4108     break;
4109   case BlockedOnBlackHole:
4110     debugBelch("is blocked on a black hole");
4111     break;
4112   case NotBlocked:
4113     debugBelch("is not blocked");
4114     break;
4115 #if defined(PARALLEL_HASKELL)
4116   case BlockedOnGA:
4117     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4118             tso->block_info.closure, info_type(tso->block_info.closure));
4119     break;
4120   case BlockedOnGA_NoSend:
4121     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4122             tso->block_info.closure, info_type(tso->block_info.closure));
4123     break;
4124 #endif
4125   case BlockedOnCCall:
4126     debugBelch("is blocked on an external call");
4127     break;
4128   case BlockedOnCCall_NoUnblockExc:
4129     debugBelch("is blocked on an external call (exceptions were already blocked)");
4130     break;
4131   case BlockedOnSTM:
4132     debugBelch("is blocked on an STM operation");
4133     break;
4134   default:
4135     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4136          tso->why_blocked, tso->id, tso);
4137   }
4138 }
4139
4140 void
4141 printThreadStatus(StgTSO *t)
4142 {
4143     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4144     {
4145       void *label = lookupThreadLabel(t->id);
4146       if (label) debugBelch("[\"%s\"] ",(char *)label);
4147     }
4148     if (t->what_next == ThreadRelocated) {
4149         debugBelch("has been relocated...\n");
4150     } else {
4151         switch (t->what_next) {
4152         case ThreadKilled:
4153             debugBelch("has been killed");
4154             break;
4155         case ThreadComplete:
4156             debugBelch("has completed");
4157             break;
4158         default:
4159             printThreadBlockage(t);
4160         }
4161         debugBelch("\n");
4162     }
4163 }
4164
4165 void
4166 printAllThreads(void)
4167 {
4168   StgTSO *t, *next;
4169   nat i;
4170   Capability *cap;
4171
4172 # if defined(GRAN)
4173   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4174   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4175                        time_string, rtsFalse/*no commas!*/);
4176
4177   debugBelch("all threads at [%s]:\n", time_string);
4178 # elif defined(PARALLEL_HASKELL)
4179   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4180   ullong_format_string(CURRENT_TIME,
4181                        time_string, rtsFalse/*no commas!*/);
4182
4183   debugBelch("all threads at [%s]:\n", time_string);
4184 # else
4185   debugBelch("all threads:\n");
4186 # endif
4187
4188   for (i = 0; i < n_capabilities; i++) {
4189       cap = &capabilities[i];
4190       debugBelch("threads on capability %d:\n", cap->no);
4191       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4192           printThreadStatus(t);
4193       }
4194   }
4195
4196   debugBelch("other threads:\n");
4197   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4198       if (t->why_blocked != NotBlocked) {
4199           printThreadStatus(t);
4200       }
4201       if (t->what_next == ThreadRelocated) {
4202           next = t->link;
4203       } else {
4204           next = t->global_link;
4205       }
4206   }
4207 }
4208
4209 // useful from gdb
4210 void 
4211 printThreadQueue(StgTSO *t)
4212 {
4213     nat i = 0;
4214     for (; t != END_TSO_QUEUE; t = t->link) {
4215         printThreadStatus(t);
4216         i++;
4217     }
4218     debugBelch("%d threads on queue\n", i);
4219 }
4220
4221 /* 
4222    Print a whole blocking queue attached to node (debugging only).
4223 */
4224 # if defined(PARALLEL_HASKELL)
4225 void 
4226 print_bq (StgClosure *node)
4227 {
4228   StgBlockingQueueElement *bqe;
4229   StgTSO *tso;
4230   rtsBool end;
4231
4232   debugBelch("## BQ of closure %p (%s): ",
4233           node, info_type(node));
4234
4235   /* should cover all closures that may have a blocking queue */
4236   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4237          get_itbl(node)->type == FETCH_ME_BQ ||
4238          get_itbl(node)->type == RBH ||
4239          get_itbl(node)->type == MVAR);
4240     
4241   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4242
4243   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4244 }
4245
4246 /* 
4247    Print a whole blocking queue starting with the element bqe.
4248 */
4249 void 
4250 print_bqe (StgBlockingQueueElement *bqe)
4251 {
4252   rtsBool end;
4253
4254   /* 
4255      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4256   */
4257   for (end = (bqe==END_BQ_QUEUE);
4258        !end; // iterate until bqe points to a CONSTR
4259        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4260        bqe = end ? END_BQ_QUEUE : bqe->link) {
4261     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4262     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4263     /* types of closures that may appear in a blocking queue */
4264     ASSERT(get_itbl(bqe)->type == TSO ||           
4265            get_itbl(bqe)->type == BLOCKED_FETCH || 
4266            get_itbl(bqe)->type == CONSTR); 
4267     /* only BQs of an RBH end with an RBH_Save closure */
4268     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4269
4270     switch (get_itbl(bqe)->type) {
4271     case TSO:
4272       debugBelch(" TSO %u (%x),",
4273               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4274       break;
4275     case BLOCKED_FETCH:
4276       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4277               ((StgBlockedFetch *)bqe)->node, 
4278               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4279               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4280               ((StgBlockedFetch *)bqe)->ga.weight);
4281       break;
4282     case CONSTR:
4283       debugBelch(" %s (IP %p),",
4284               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4285                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4286                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4287                "RBH_Save_?"), get_itbl(bqe));
4288       break;
4289     default:
4290       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4291            info_type((StgClosure *)bqe)); // , node, info_type(node));
4292       break;
4293     }
4294   } /* for */
4295   debugBelch("\n");
4296 }
4297 # elif defined(GRAN)
4298 void 
4299 print_bq (StgClosure *node)
4300 {
4301   StgBlockingQueueElement *bqe;
4302   PEs node_loc, tso_loc;
4303   rtsBool end;
4304
4305   /* should cover all closures that may have a blocking queue */
4306   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4307          get_itbl(node)->type == FETCH_ME_BQ ||
4308          get_itbl(node)->type == RBH);
4309     
4310   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4311   node_loc = where_is(node);
4312
4313   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4314           node, info_type(node), node_loc);
4315
4316   /* 
4317      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4318   */
4319   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4320        !end; // iterate until bqe points to a CONSTR
4321        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4322     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4323     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4324     /* types of closures that may appear in a blocking queue */
4325     ASSERT(get_itbl(bqe)->type == TSO ||           
4326            get_itbl(bqe)->type == CONSTR); 
4327     /* only BQs of an RBH end with an RBH_Save closure */
4328     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4329
4330     tso_loc = where_is((StgClosure *)bqe);
4331     switch (get_itbl(bqe)->type) {
4332     case TSO:
4333       debugBelch(" TSO %d (%p) on [PE %d],",
4334               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4335       break;
4336     case CONSTR:
4337       debugBelch(" %s (IP %p),",
4338               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4339                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4340                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4341                "RBH_Save_?"), get_itbl(bqe));
4342       break;
4343     default:
4344       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4345            info_type((StgClosure *)bqe), node, info_type(node));
4346       break;
4347     }
4348   } /* for */
4349   debugBelch("\n");
4350 }
4351 # endif
4352
4353 #if defined(PARALLEL_HASKELL)
4354 static nat
4355 run_queue_len(void)
4356 {
4357     nat i;
4358     StgTSO *tso;
4359     
4360     for (i=0, tso=run_queue_hd; 
4361          tso != END_TSO_QUEUE;
4362          i++, tso=tso->link) {
4363         /* nothing */
4364     }
4365         
4366     return i;
4367 }
4368 #endif
4369
4370 void
4371 sched_belch(char *s, ...)
4372 {
4373     va_list ap;
4374     va_start(ap,s);
4375 #ifdef THREADED_RTS
4376     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4377 #elif defined(PARALLEL_HASKELL)
4378     debugBelch("== ");
4379 #else
4380     debugBelch("sched: ");
4381 #endif
4382     vdebugBelch(s, ap);
4383     debugBelch("\n");
4384     va_end(ap);
4385 }
4386
4387 #endif /* DEBUG */