[project @ 2005-11-24 10:39:59 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically, StgPtr stop_here);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418 #if defined(SMP)
419         discardSparksCap(cap);
420 #endif
421         if (shutting_down_scheduler) {
422             IF_DEBUG(scheduler, sched_belch("shutting down"));
423             // If we are a worker, just exit.  If we're a bound thread
424             // then we will exit below when we've removed our TSO from
425             // the run queue.
426             if (task->tso == NULL && emptyRunQueue(cap)) {
427                 return cap;
428             }
429         } else {
430             IF_DEBUG(scheduler, sched_belch("interrupted"));
431         }
432     }
433
434 #if defined(SMP)
435     // If the run queue is empty, take a spark and turn it into a thread.
436     {
437         if (emptyRunQueue(cap)) {
438             StgClosure *spark;
439             spark = findSpark(cap);
440             if (spark != NULL) {
441                 IF_DEBUG(scheduler,
442                          sched_belch("turning spark of closure %p into a thread",
443                                      (StgClosure *)spark));
444                 createSparkThread(cap,spark);     
445             }
446         }
447     }
448 #endif // SMP
449
450     scheduleStartSignalHandlers(cap);
451
452     // Only check the black holes here if we've nothing else to do.
453     // During normal execution, the black hole list only gets checked
454     // at GC time, to avoid repeatedly traversing this possibly long
455     // list each time around the scheduler.
456     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
457
458     scheduleCheckBlockedThreads(cap);
459
460     scheduleDetectDeadlock(cap,task);
461
462     // Normally, the only way we can get here with no threads to
463     // run is if a keyboard interrupt received during 
464     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465     // Additionally, it is not fatal for the
466     // threaded RTS to reach here with no threads to run.
467     //
468     // win32: might be here due to awaitEvent() being abandoned
469     // as a result of a console event having been delivered.
470     if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
472         ASSERT(interrupted);
473 #endif
474         continue; // nothing to do
475     }
476
477 #if defined(PARALLEL_HASKELL)
478     scheduleSendPendingMessages();
479     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
480         continue;
481
482 #if defined(SPARKS)
483     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
484 #endif
485
486     /* If we still have no work we need to send a FISH to get a spark
487        from another PE */
488     if (emptyRunQueue(cap)) {
489         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490         ASSERT(rtsFalse); // should not happen at the moment
491     }
492     // from here: non-empty run queue.
493     //  TODO: merge above case with this, only one call processMessages() !
494     if (PacketsWaiting()) {  /* process incoming messages, if
495                                 any pending...  only in else
496                                 because getRemoteWork waits for
497                                 messages as well */
498         receivedFinish = processMessages();
499     }
500 #endif
501
502 #if defined(GRAN)
503     scheduleProcessEvent(event);
504 #endif
505
506     // 
507     // Get a thread to run
508     //
509     t = popRunQueue(cap);
510
511 #if defined(GRAN) || defined(PAR)
512     scheduleGranParReport(); // some kind of debuging output
513 #else
514     // Sanity check the thread we're about to run.  This can be
515     // expensive if there is lots of thread switching going on...
516     IF_DEBUG(sanity,checkTSO(t));
517 #endif
518
519 #if defined(THREADED_RTS)
520     // Check whether we can run this thread in the current task.
521     // If not, we have to pass our capability to the right task.
522     {
523         Task *bound = t->bound;
524       
525         if (bound) {
526             if (bound == task) {
527                 IF_DEBUG(scheduler,
528                          sched_belch("### Running thread %d in bound thread",
529                                      t->id));
530                 // yes, the Haskell thread is bound to the current native thread
531             } else {
532                 IF_DEBUG(scheduler,
533                          sched_belch("### thread %d bound to another OS thread",
534                                      t->id));
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 IF_DEBUG(scheduler,
543                          sched_belch("### this OS thread cannot run thread %d", t->id));
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]));
568
569 #if defined(PROFILING)
570     startHeapProfTimer();
571 #endif
572
573     // ----------------------------------------------------------------------
574     // Run the current thread 
575
576     prev_what_next = t->what_next;
577
578     errno = t->saved_errno;
579     cap->in_haskell = rtsTrue;
580
581     recent_activity = ACTIVITY_YES;
582
583     switch (prev_what_next) {
584         
585     case ThreadKilled:
586     case ThreadComplete:
587         /* Thread already finished, return to scheduler. */
588         ret = ThreadFinished;
589         break;
590         
591     case ThreadRunGHC:
592     {
593         StgRegTable *r;
594         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595         cap = regTableToCapability(r);
596         ret = r->rRet;
597         break;
598     }
599     
600     case ThreadInterpret:
601         cap = interpretBCO(cap);
602         ret = cap->r.rRet;
603         break;
604         
605     default:
606         barf("schedule: invalid what_next field");
607     }
608
609     cap->in_haskell = rtsFalse;
610
611     // The TSO might have moved, eg. if it re-entered the RTS and a GC
612     // happened.  So find the new location:
613     t = cap->r.rCurrentTSO;
614
615     // We have run some Haskell code: there might be blackhole-blocked
616     // threads to wake up now.
617     // Lock-free test here should be ok, we're just setting a flag.
618     if ( blackhole_queue != END_TSO_QUEUE ) {
619         blackholes_need_checking = rtsTrue;
620     }
621     
622     // And save the current errno in this thread.
623     // XXX: possibly bogus for SMP because this thread might already
624     // be running again, see code below.
625     t->saved_errno = errno;
626
627 #ifdef SMP
628     // If ret is ThreadBlocked, and this Task is bound to the TSO that
629     // blocked, we are in limbo - the TSO is now owned by whatever it
630     // is blocked on, and may in fact already have been woken up,
631     // perhaps even on a different Capability.  It may be the case
632     // that task->cap != cap.  We better yield this Capability
633     // immediately and return to normaility.
634     if (ret == ThreadBlocked) {
635         IF_DEBUG(scheduler,
636                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
637                             t->id, whatNext_strs[t->what_next]));
638         continue;
639     }
640 #endif
641
642     ASSERT_CAPABILITY_INVARIANTS(cap,task);
643
644     // ----------------------------------------------------------------------
645     
646     // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
648     stopHeapProfTimer();
649     CCCS = CCS_SYSTEM;
650 #endif
651     
652 #if defined(THREADED_RTS)
653     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655     IF_DEBUG(scheduler,debugBelch("sched: "););
656 #endif
657     
658     schedulePostRunThread();
659
660     ready_to_gc = rtsFalse;
661
662     switch (ret) {
663     case HeapOverflow:
664         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
665         break;
666
667     case StackOverflow:
668         scheduleHandleStackOverflow(cap,task,t);
669         break;
670
671     case ThreadYielding:
672         if (scheduleHandleYield(cap, t, prev_what_next)) {
673             // shortcut for switching between compiler/interpreter:
674             goto run_thread; 
675         }
676         break;
677
678     case ThreadBlocked:
679         scheduleHandleThreadBlocked(t);
680         break;
681
682     case ThreadFinished:
683         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684         ASSERT_CAPABILITY_INVARIANTS(cap,task);
685         break;
686
687     default:
688       barf("schedule: invalid thread return code %d", (int)ret);
689     }
690
691     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693   } /* end of while() */
694
695   IF_PAR_DEBUG(verbose,
696                debugBelch("== Leaving schedule() after having received Finish\n"));
697 }
698
699 /* ----------------------------------------------------------------------------
700  * Setting up the scheduler loop
701  * ------------------------------------------------------------------------- */
702
703 static void
704 schedulePreLoop(void)
705 {
706 #if defined(GRAN) 
707     /* set up first event to get things going */
708     /* ToDo: assign costs for system setup and init MainTSO ! */
709     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
710               ContinueThread, 
711               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
712     
713     IF_DEBUG(gran,
714              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
715                         CurrentTSO);
716              G_TSO(CurrentTSO, 5));
717     
718     if (RtsFlags.GranFlags.Light) {
719         /* Save current time; GranSim Light only */
720         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
721     }      
722 #endif
723 }
724
725 /* -----------------------------------------------------------------------------
726  * schedulePushWork()
727  *
728  * Push work to other Capabilities if we have some.
729  * -------------------------------------------------------------------------- */
730
731 #ifdef SMP
732 static void
733 schedulePushWork(Capability *cap USED_WHEN_SMP, 
734                  Task *task      USED_WHEN_SMP)
735 {
736     Capability *free_caps[n_capabilities], *cap0;
737     nat i, n_free_caps;
738
739     // Check whether we have more threads on our run queue, or sparks
740     // in our pool, that we could hand to another Capability.
741     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742         && sparkPoolSizeCap(cap) < 2) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         rtsBool pushed_to_all;
774
775         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
776
777         i = 0;
778         pushed_to_all = rtsFalse;
779
780         if (cap->run_queue_hd != END_TSO_QUEUE) {
781             prev = cap->run_queue_hd;
782             t = prev->link;
783             prev->link = END_TSO_QUEUE;
784             for (; t != END_TSO_QUEUE; t = next) {
785                 next = t->link;
786                 t->link = END_TSO_QUEUE;
787                 if (t->what_next == ThreadRelocated
788                     || t->bound == task) { // don't move my bound thread
789                     prev->link = t;
790                     prev = t;
791                 } else if (i == n_free_caps) {
792                     pushed_to_all = rtsTrue;
793                     i = 0;
794                     // keep one for us
795                     prev->link = t;
796                     prev = t;
797                 } else {
798                     appendToRunQueue(free_caps[i],t);
799                     if (t->bound) { t->bound->cap = free_caps[i]; }
800                     i++;
801                 }
802             }
803             cap->run_queue_tl = prev;
804         }
805
806         // If there are some free capabilities that we didn't push any
807         // threads to, then try to push a spark to each one.
808         if (!pushed_to_all) {
809             StgClosure *spark;
810             // i is the next free capability to push to
811             for (; i < n_free_caps; i++) {
812                 if (emptySparkPoolCap(free_caps[i])) {
813                     spark = findSpark(cap);
814                     if (spark != NULL) {
815                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
816                         newSpark(&(free_caps[i]->r), spark);
817                     }
818                 }
819             }
820         }
821
822         // release the capabilities
823         for (i = 0; i < n_free_caps; i++) {
824             task->cap = free_caps[i];
825             releaseCapability(free_caps[i]);
826         }
827     }
828     task->cap = cap; // reset to point to our Capability.
829 }
830 #endif
831
832 /* ----------------------------------------------------------------------------
833  * Start any pending signal handlers
834  * ------------------------------------------------------------------------- */
835
836 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
837 static void
838 scheduleStartSignalHandlers(Capability *cap)
839 {
840     if (signals_pending()) { // safe outside the lock
841         startSignalHandlers(cap);
842     }
843 }
844 #else
845 static void
846 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
847 {
848 }
849 #endif
850
851 /* ----------------------------------------------------------------------------
852  * Check for blocked threads that can be woken up.
853  * ------------------------------------------------------------------------- */
854
855 static void
856 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
857 {
858 #if !defined(THREADED_RTS)
859     //
860     // Check whether any waiting threads need to be woken up.  If the
861     // run queue is empty, and there are no other tasks running, we
862     // can wait indefinitely for something to happen.
863     //
864     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
865     {
866         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
867     }
868 #endif
869 }
870
871
872 /* ----------------------------------------------------------------------------
873  * Check for threads blocked on BLACKHOLEs that can be woken up
874  * ------------------------------------------------------------------------- */
875 static void
876 scheduleCheckBlackHoles (Capability *cap)
877 {
878     if ( blackholes_need_checking ) // check without the lock first
879     {
880         ACQUIRE_LOCK(&sched_mutex);
881         if ( blackholes_need_checking ) {
882             checkBlackHoles(cap);
883             blackholes_need_checking = rtsFalse;
884         }
885         RELEASE_LOCK(&sched_mutex);
886     }
887 }
888
889 /* ----------------------------------------------------------------------------
890  * Detect deadlock conditions and attempt to resolve them.
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleDetectDeadlock (Capability *cap, Task *task)
895 {
896
897 #if defined(PARALLEL_HASKELL)
898     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
899     return;
900 #endif
901
902     /* 
903      * Detect deadlock: when we have no threads to run, there are no
904      * threads blocked, waiting for I/O, or sleeping, and all the
905      * other tasks are waiting for work, we must have a deadlock of
906      * some description.
907      */
908     if ( emptyThreadQueues(cap) )
909     {
910 #if defined(THREADED_RTS)
911         /* 
912          * In the threaded RTS, we only check for deadlock if there
913          * has been no activity in a complete timeslice.  This means
914          * we won't eagerly start a full GC just because we don't have
915          * any threads to run currently.
916          */
917         if (recent_activity != ACTIVITY_INACTIVE) return;
918 #endif
919
920         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
921
922         // Garbage collection can release some new threads due to
923         // either (a) finalizers or (b) threads resurrected because
924         // they are unreachable and will therefore be sent an
925         // exception.  Any threads thus released will be immediately
926         // runnable.
927         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
928         recent_activity = ACTIVITY_DONE_GC;
929         
930         if ( !emptyRunQueue(cap) ) return;
931
932 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
933         /* If we have user-installed signal handlers, then wait
934          * for signals to arrive rather then bombing out with a
935          * deadlock.
936          */
937         if ( anyUserHandlers() ) {
938             IF_DEBUG(scheduler, 
939                      sched_belch("still deadlocked, waiting for signals..."));
940
941             awaitUserSignals();
942
943             if (signals_pending()) {
944                 startSignalHandlers(cap);
945             }
946
947             // either we have threads to run, or we were interrupted:
948             ASSERT(!emptyRunQueue(cap) || interrupted);
949         }
950 #endif
951
952 #if !defined(THREADED_RTS)
953         /* Probably a real deadlock.  Send the current main thread the
954          * Deadlock exception.
955          */
956         if (task->tso) {
957             switch (task->tso->why_blocked) {
958             case BlockedOnSTM:
959             case BlockedOnBlackHole:
960             case BlockedOnException:
961             case BlockedOnMVar:
962                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
963                 return;
964             default:
965                 barf("deadlock: main thread blocked in a strange way");
966             }
967         }
968         return;
969 #endif
970     }
971 }
972
973 /* ----------------------------------------------------------------------------
974  * Process an event (GRAN only)
975  * ------------------------------------------------------------------------- */
976
977 #if defined(GRAN)
978 static StgTSO *
979 scheduleProcessEvent(rtsEvent *event)
980 {
981     StgTSO *t;
982
983     if (RtsFlags.GranFlags.Light)
984       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
985
986     /* adjust time based on time-stamp */
987     if (event->time > CurrentTime[CurrentProc] &&
988         event->evttype != ContinueThread)
989       CurrentTime[CurrentProc] = event->time;
990     
991     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
992     if (!RtsFlags.GranFlags.Light)
993       handleIdlePEs();
994
995     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
996
997     /* main event dispatcher in GranSim */
998     switch (event->evttype) {
999       /* Should just be continuing execution */
1000     case ContinueThread:
1001       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1002       /* ToDo: check assertion
1003       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1004              run_queue_hd != END_TSO_QUEUE);
1005       */
1006       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1007       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1008           procStatus[CurrentProc]==Fetching) {
1009         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1010               CurrentTSO->id, CurrentTSO, CurrentProc);
1011         goto next_thread;
1012       } 
1013       /* Ignore ContinueThreads for completed threads */
1014       if (CurrentTSO->what_next == ThreadComplete) {
1015         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1016               CurrentTSO->id, CurrentTSO, CurrentProc);
1017         goto next_thread;
1018       } 
1019       /* Ignore ContinueThreads for threads that are being migrated */
1020       if (PROCS(CurrentTSO)==Nowhere) { 
1021         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1022               CurrentTSO->id, CurrentTSO, CurrentProc);
1023         goto next_thread;
1024       }
1025       /* The thread should be at the beginning of the run queue */
1026       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1027         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1028               CurrentTSO->id, CurrentTSO, CurrentProc);
1029         break; // run the thread anyway
1030       }
1031       /*
1032       new_event(proc, proc, CurrentTime[proc],
1033                 FindWork,
1034                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1035       goto next_thread; 
1036       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1037       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1038
1039     case FetchNode:
1040       do_the_fetchnode(event);
1041       goto next_thread;             /* handle next event in event queue  */
1042       
1043     case GlobalBlock:
1044       do_the_globalblock(event);
1045       goto next_thread;             /* handle next event in event queue  */
1046       
1047     case FetchReply:
1048       do_the_fetchreply(event);
1049       goto next_thread;             /* handle next event in event queue  */
1050       
1051     case UnblockThread:   /* Move from the blocked queue to the tail of */
1052       do_the_unblock(event);
1053       goto next_thread;             /* handle next event in event queue  */
1054       
1055     case ResumeThread:  /* Move from the blocked queue to the tail of */
1056       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1057       event->tso->gran.blocktime += 
1058         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1059       do_the_startthread(event);
1060       goto next_thread;             /* handle next event in event queue  */
1061       
1062     case StartThread:
1063       do_the_startthread(event);
1064       goto next_thread;             /* handle next event in event queue  */
1065       
1066     case MoveThread:
1067       do_the_movethread(event);
1068       goto next_thread;             /* handle next event in event queue  */
1069       
1070     case MoveSpark:
1071       do_the_movespark(event);
1072       goto next_thread;             /* handle next event in event queue  */
1073       
1074     case FindWork:
1075       do_the_findwork(event);
1076       goto next_thread;             /* handle next event in event queue  */
1077       
1078     default:
1079       barf("Illegal event type %u\n", event->evttype);
1080     }  /* switch */
1081     
1082     /* This point was scheduler_loop in the old RTS */
1083
1084     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1085
1086     TimeOfLastEvent = CurrentTime[CurrentProc];
1087     TimeOfNextEvent = get_time_of_next_event();
1088     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1089     // CurrentTSO = ThreadQueueHd;
1090
1091     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1092                          TimeOfNextEvent));
1093
1094     if (RtsFlags.GranFlags.Light) 
1095       GranSimLight_leave_system(event, &ActiveTSO); 
1096
1097     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1098
1099     IF_DEBUG(gran, 
1100              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1101
1102     /* in a GranSim setup the TSO stays on the run queue */
1103     t = CurrentTSO;
1104     /* Take a thread from the run queue. */
1105     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1106
1107     IF_DEBUG(gran, 
1108              debugBelch("GRAN: About to run current thread, which is\n");
1109              G_TSO(t,5));
1110
1111     context_switch = 0; // turned on via GranYield, checking events and time slice
1112
1113     IF_DEBUG(gran, 
1114              DumpGranEvent(GR_SCHEDULE, t));
1115
1116     procStatus[CurrentProc] = Busy;
1117 }
1118 #endif // GRAN
1119
1120 /* ----------------------------------------------------------------------------
1121  * Send pending messages (PARALLEL_HASKELL only)
1122  * ------------------------------------------------------------------------- */
1123
1124 #if defined(PARALLEL_HASKELL)
1125 static StgTSO *
1126 scheduleSendPendingMessages(void)
1127 {
1128     StgSparkPool *pool;
1129     rtsSpark spark;
1130     StgTSO *t;
1131
1132 # if defined(PAR) // global Mem.Mgmt., omit for now
1133     if (PendingFetches != END_BF_QUEUE) {
1134         processFetches();
1135     }
1136 # endif
1137     
1138     if (RtsFlags.ParFlags.BufferTime) {
1139         // if we use message buffering, we must send away all message
1140         // packets which have become too old...
1141         sendOldBuffers(); 
1142     }
1143 }
1144 #endif
1145
1146 /* ----------------------------------------------------------------------------
1147  * Activate spark threads (PARALLEL_HASKELL only)
1148  * ------------------------------------------------------------------------- */
1149
1150 #if defined(PARALLEL_HASKELL)
1151 static void
1152 scheduleActivateSpark(void)
1153 {
1154 #if defined(SPARKS)
1155   ASSERT(emptyRunQueue());
1156 /* We get here if the run queue is empty and want some work.
1157    We try to turn a spark into a thread, and add it to the run queue,
1158    from where it will be picked up in the next iteration of the scheduler
1159    loop.
1160 */
1161
1162       /* :-[  no local threads => look out for local sparks */
1163       /* the spark pool for the current PE */
1164       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1165       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1166           pool->hd < pool->tl) {
1167         /* 
1168          * ToDo: add GC code check that we really have enough heap afterwards!!
1169          * Old comment:
1170          * If we're here (no runnable threads) and we have pending
1171          * sparks, we must have a space problem.  Get enough space
1172          * to turn one of those pending sparks into a
1173          * thread... 
1174          */
1175
1176         spark = findSpark(rtsFalse);            /* get a spark */
1177         if (spark != (rtsSpark) NULL) {
1178           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1179           IF_PAR_DEBUG(fish, // schedule,
1180                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1181                              tso->id, tso, advisory_thread_count));
1182
1183           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1184             IF_PAR_DEBUG(fish, // schedule,
1185                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1186                             spark));
1187             return rtsFalse; /* failed to generate a thread */
1188           }                  /* otherwise fall through & pick-up new tso */
1189         } else {
1190           IF_PAR_DEBUG(fish, // schedule,
1191                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1192                              spark_queue_len(pool)));
1193           return rtsFalse;  /* failed to generate a thread */
1194         }
1195         return rtsTrue;  /* success in generating a thread */
1196   } else { /* no more threads permitted or pool empty */
1197     return rtsFalse;  /* failed to generateThread */
1198   }
1199 #else
1200   tso = NULL; // avoid compiler warning only
1201   return rtsFalse;  /* dummy in non-PAR setup */
1202 #endif // SPARKS
1203 }
1204 #endif // PARALLEL_HASKELL
1205
1206 /* ----------------------------------------------------------------------------
1207  * Get work from a remote node (PARALLEL_HASKELL only)
1208  * ------------------------------------------------------------------------- */
1209     
1210 #if defined(PARALLEL_HASKELL)
1211 static rtsBool
1212 scheduleGetRemoteWork(rtsBool *receivedFinish)
1213 {
1214   ASSERT(emptyRunQueue());
1215
1216   if (RtsFlags.ParFlags.BufferTime) {
1217         IF_PAR_DEBUG(verbose, 
1218                 debugBelch("...send all pending data,"));
1219         {
1220           nat i;
1221           for (i=1; i<=nPEs; i++)
1222             sendImmediately(i); // send all messages away immediately
1223         }
1224   }
1225 # ifndef SPARKS
1226         //++EDEN++ idle() , i.e. send all buffers, wait for work
1227         // suppress fishing in EDEN... just look for incoming messages
1228         // (blocking receive)
1229   IF_PAR_DEBUG(verbose, 
1230                debugBelch("...wait for incoming messages...\n"));
1231   *receivedFinish = processMessages(); // blocking receive...
1232
1233         // and reenter scheduling loop after having received something
1234         // (return rtsFalse below)
1235
1236 # else /* activate SPARKS machinery */
1237 /* We get here, if we have no work, tried to activate a local spark, but still
1238    have no work. We try to get a remote spark, by sending a FISH message.
1239    Thread migration should be added here, and triggered when a sequence of 
1240    fishes returns without work. */
1241         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1242
1243       /* =8-[  no local sparks => look for work on other PEs */
1244         /*
1245          * We really have absolutely no work.  Send out a fish
1246          * (there may be some out there already), and wait for
1247          * something to arrive.  We clearly can't run any threads
1248          * until a SCHEDULE or RESUME arrives, and so that's what
1249          * we're hoping to see.  (Of course, we still have to
1250          * respond to other types of messages.)
1251          */
1252         rtsTime now = msTime() /*CURRENT_TIME*/;
1253         IF_PAR_DEBUG(verbose, 
1254                      debugBelch("--  now=%ld\n", now));
1255         IF_PAR_DEBUG(fish, // verbose,
1256              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1257                  (last_fish_arrived_at!=0 &&
1258                   last_fish_arrived_at+delay > now)) {
1259                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1260                      now, last_fish_arrived_at+delay, 
1261                      last_fish_arrived_at,
1262                      delay);
1263              });
1264   
1265         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1266             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1267           if (last_fish_arrived_at==0 ||
1268               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1269             /* outstandingFishes is set in sendFish, processFish;
1270                avoid flooding system with fishes via delay */
1271     next_fish_to_send_at = 0;  
1272   } else {
1273     /* ToDo: this should be done in the main scheduling loop to avoid the
1274              busy wait here; not so bad if fish delay is very small  */
1275     int iq = 0; // DEBUGGING -- HWL
1276     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1277     /* send a fish when ready, but process messages that arrive in the meantime */
1278     do {
1279       if (PacketsWaiting()) {
1280         iq++; // DEBUGGING
1281         *receivedFinish = processMessages();
1282       }
1283       now = msTime();
1284     } while (!*receivedFinish || now<next_fish_to_send_at);
1285     // JB: This means the fish could become obsolete, if we receive
1286     // work. Better check for work again? 
1287     // last line: while (!receivedFinish || !haveWork || now<...)
1288     // next line: if (receivedFinish || haveWork )
1289
1290     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1291       return rtsFalse;  // NB: this will leave scheduler loop
1292                         // immediately after return!
1293                           
1294     IF_PAR_DEBUG(fish, // verbose,
1295                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1296
1297   }
1298
1299     // JB: IMHO, this should all be hidden inside sendFish(...)
1300     /* pe = choosePE(); 
1301        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1302                 NEW_FISH_HUNGER);
1303
1304     // Global statistics: count no. of fishes
1305     if (RtsFlags.ParFlags.ParStats.Global &&
1306          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1307            globalParStats.tot_fish_mess++;
1308            }
1309     */ 
1310
1311   /* delayed fishes must have been sent by now! */
1312   next_fish_to_send_at = 0;  
1313   }
1314       
1315   *receivedFinish = processMessages();
1316 # endif /* SPARKS */
1317
1318  return rtsFalse;
1319  /* NB: this function always returns rtsFalse, meaning the scheduler
1320     loop continues with the next iteration; 
1321     rationale: 
1322       return code means success in finding work; we enter this function
1323       if there is no local work, thus have to send a fish which takes
1324       time until it arrives with work; in the meantime we should process
1325       messages in the main loop;
1326  */
1327 }
1328 #endif // PARALLEL_HASKELL
1329
1330 /* ----------------------------------------------------------------------------
1331  * PAR/GRAN: Report stats & debugging info(?)
1332  * ------------------------------------------------------------------------- */
1333
1334 #if defined(PAR) || defined(GRAN)
1335 static void
1336 scheduleGranParReport(void)
1337 {
1338   ASSERT(run_queue_hd != END_TSO_QUEUE);
1339
1340   /* Take a thread from the run queue, if we have work */
1341   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1342
1343     /* If this TSO has got its outport closed in the meantime, 
1344      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1345      * It has to be marked as TH_DEAD for this purpose.
1346      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1347
1348 JB: TODO: investigate wether state change field could be nuked
1349      entirely and replaced by the normal tso state (whatnext
1350      field). All we want to do is to kill tsos from outside.
1351      */
1352
1353     /* ToDo: write something to the log-file
1354     if (RTSflags.ParFlags.granSimStats && !sameThread)
1355         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1356
1357     CurrentTSO = t;
1358     */
1359     /* the spark pool for the current PE */
1360     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1361
1362     IF_DEBUG(scheduler, 
1363              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1364                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1365
1366     IF_PAR_DEBUG(fish,
1367              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1368                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1369
1370     if (RtsFlags.ParFlags.ParStats.Full && 
1371         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1372         (emitSchedule || // forced emit
1373          (t && LastTSO && t->id != LastTSO->id))) {
1374       /* 
1375          we are running a different TSO, so write a schedule event to log file
1376          NB: If we use fair scheduling we also have to write  a deschedule 
1377              event for LastTSO; with unfair scheduling we know that the
1378              previous tso has blocked whenever we switch to another tso, so
1379              we don't need it in GUM for now
1380       */
1381       IF_PAR_DEBUG(fish, // schedule,
1382                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1383
1384       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1385                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1386       emitSchedule = rtsFalse;
1387     }
1388 }     
1389 #endif
1390
1391 /* ----------------------------------------------------------------------------
1392  * After running a thread...
1393  * ------------------------------------------------------------------------- */
1394
1395 static void
1396 schedulePostRunThread(void)
1397 {
1398 #if defined(PAR)
1399     /* HACK 675: if the last thread didn't yield, make sure to print a 
1400        SCHEDULE event to the log file when StgRunning the next thread, even
1401        if it is the same one as before */
1402     LastTSO = t; 
1403     TimeOfLastYield = CURRENT_TIME;
1404 #endif
1405
1406   /* some statistics gathering in the parallel case */
1407
1408 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1409   switch (ret) {
1410     case HeapOverflow:
1411 # if defined(GRAN)
1412       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1413       globalGranStats.tot_heapover++;
1414 # elif defined(PAR)
1415       globalParStats.tot_heapover++;
1416 # endif
1417       break;
1418
1419      case StackOverflow:
1420 # if defined(GRAN)
1421       IF_DEBUG(gran, 
1422                DumpGranEvent(GR_DESCHEDULE, t));
1423       globalGranStats.tot_stackover++;
1424 # elif defined(PAR)
1425       // IF_DEBUG(par, 
1426       // DumpGranEvent(GR_DESCHEDULE, t);
1427       globalParStats.tot_stackover++;
1428 # endif
1429       break;
1430
1431     case ThreadYielding:
1432 # if defined(GRAN)
1433       IF_DEBUG(gran, 
1434                DumpGranEvent(GR_DESCHEDULE, t));
1435       globalGranStats.tot_yields++;
1436 # elif defined(PAR)
1437       // IF_DEBUG(par, 
1438       // DumpGranEvent(GR_DESCHEDULE, t);
1439       globalParStats.tot_yields++;
1440 # endif
1441       break; 
1442
1443     case ThreadBlocked:
1444 # if defined(GRAN)
1445       IF_DEBUG(scheduler,
1446                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1447                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1448                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1449                if (t->block_info.closure!=(StgClosure*)NULL)
1450                  print_bq(t->block_info.closure);
1451                debugBelch("\n"));
1452
1453       // ??? needed; should emit block before
1454       IF_DEBUG(gran, 
1455                DumpGranEvent(GR_DESCHEDULE, t)); 
1456       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1457       /*
1458         ngoq Dogh!
1459       ASSERT(procStatus[CurrentProc]==Busy || 
1460               ((procStatus[CurrentProc]==Fetching) && 
1461               (t->block_info.closure!=(StgClosure*)NULL)));
1462       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1463           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1464             procStatus[CurrentProc]==Fetching)) 
1465         procStatus[CurrentProc] = Idle;
1466       */
1467 # elif defined(PAR)
1468 //++PAR++  blockThread() writes the event (change?)
1469 # endif
1470     break;
1471
1472   case ThreadFinished:
1473     break;
1474
1475   default:
1476     barf("parGlobalStats: unknown return code");
1477     break;
1478     }
1479 #endif
1480 }
1481
1482 /* -----------------------------------------------------------------------------
1483  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1484  * -------------------------------------------------------------------------- */
1485
1486 static rtsBool
1487 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1488 {
1489     // did the task ask for a large block?
1490     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1491         // if so, get one and push it on the front of the nursery.
1492         bdescr *bd;
1493         lnat blocks;
1494         
1495         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1496         
1497         IF_DEBUG(scheduler,
1498                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1499                             (long)t->id, whatNext_strs[t->what_next], blocks));
1500         
1501         // don't do this if the nursery is (nearly) full, we'll GC first.
1502         if (cap->r.rCurrentNursery->link != NULL ||
1503             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1504                                                // if the nursery has only one block.
1505             
1506             ACQUIRE_SM_LOCK
1507             bd = allocGroup( blocks );
1508             RELEASE_SM_LOCK
1509             cap->r.rNursery->n_blocks += blocks;
1510             
1511             // link the new group into the list
1512             bd->link = cap->r.rCurrentNursery;
1513             bd->u.back = cap->r.rCurrentNursery->u.back;
1514             if (cap->r.rCurrentNursery->u.back != NULL) {
1515                 cap->r.rCurrentNursery->u.back->link = bd;
1516             } else {
1517 #if !defined(SMP)
1518                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1519                        g0s0 == cap->r.rNursery);
1520 #endif
1521                 cap->r.rNursery->blocks = bd;
1522             }             
1523             cap->r.rCurrentNursery->u.back = bd;
1524             
1525             // initialise it as a nursery block.  We initialise the
1526             // step, gen_no, and flags field of *every* sub-block in
1527             // this large block, because this is easier than making
1528             // sure that we always find the block head of a large
1529             // block whenever we call Bdescr() (eg. evacuate() and
1530             // isAlive() in the GC would both have to do this, at
1531             // least).
1532             { 
1533                 bdescr *x;
1534                 for (x = bd; x < bd + blocks; x++) {
1535                     x->step = cap->r.rNursery;
1536                     x->gen_no = 0;
1537                     x->flags = 0;
1538                 }
1539             }
1540             
1541             // This assert can be a killer if the app is doing lots
1542             // of large block allocations.
1543             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1544             
1545             // now update the nursery to point to the new block
1546             cap->r.rCurrentNursery = bd;
1547             
1548             // we might be unlucky and have another thread get on the
1549             // run queue before us and steal the large block, but in that
1550             // case the thread will just end up requesting another large
1551             // block.
1552             pushOnRunQueue(cap,t);
1553             return rtsFalse;  /* not actually GC'ing */
1554         }
1555     }
1556     
1557     IF_DEBUG(scheduler,
1558              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1559                         (long)t->id, whatNext_strs[t->what_next]));
1560 #if defined(GRAN)
1561     ASSERT(!is_on_queue(t,CurrentProc));
1562 #elif defined(PARALLEL_HASKELL)
1563     /* Currently we emit a DESCHEDULE event before GC in GUM.
1564        ToDo: either add separate event to distinguish SYSTEM time from rest
1565        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1566     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1567         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1568                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1569         emitSchedule = rtsTrue;
1570     }
1571 #endif
1572       
1573     pushOnRunQueue(cap,t);
1574     return rtsTrue;
1575     /* actual GC is done at the end of the while loop in schedule() */
1576 }
1577
1578 /* -----------------------------------------------------------------------------
1579  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1580  * -------------------------------------------------------------------------- */
1581
1582 static void
1583 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1584 {
1585     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1586                                   (long)t->id, whatNext_strs[t->what_next]));
1587     /* just adjust the stack for this thread, then pop it back
1588      * on the run queue.
1589      */
1590     { 
1591         /* enlarge the stack */
1592         StgTSO *new_t = threadStackOverflow(cap, t);
1593         
1594         /* The TSO attached to this Task may have moved, so update the
1595          * pointer to it.
1596          */
1597         if (task->tso == t) {
1598             task->tso = new_t;
1599         }
1600         pushOnRunQueue(cap,new_t);
1601     }
1602 }
1603
1604 /* -----------------------------------------------------------------------------
1605  * Handle a thread that returned to the scheduler with ThreadYielding
1606  * -------------------------------------------------------------------------- */
1607
1608 static rtsBool
1609 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1610 {
1611     // Reset the context switch flag.  We don't do this just before
1612     // running the thread, because that would mean we would lose ticks
1613     // during GC, which can lead to unfair scheduling (a thread hogs
1614     // the CPU because the tick always arrives during GC).  This way
1615     // penalises threads that do a lot of allocation, but that seems
1616     // better than the alternative.
1617     context_switch = 0;
1618     
1619     /* put the thread back on the run queue.  Then, if we're ready to
1620      * GC, check whether this is the last task to stop.  If so, wake
1621      * up the GC thread.  getThread will block during a GC until the
1622      * GC is finished.
1623      */
1624     IF_DEBUG(scheduler,
1625              if (t->what_next != prev_what_next) {
1626                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1627                             (long)t->id, whatNext_strs[t->what_next]);
1628              } else {
1629                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1630                             (long)t->id, whatNext_strs[t->what_next]);
1631              }
1632         );
1633     
1634     IF_DEBUG(sanity,
1635              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1636              checkTSO(t));
1637     ASSERT(t->link == END_TSO_QUEUE);
1638     
1639     // Shortcut if we're just switching evaluators: don't bother
1640     // doing stack squeezing (which can be expensive), just run the
1641     // thread.
1642     if (t->what_next != prev_what_next) {
1643         return rtsTrue;
1644     }
1645     
1646 #if defined(GRAN)
1647     ASSERT(!is_on_queue(t,CurrentProc));
1648       
1649     IF_DEBUG(sanity,
1650              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1651              checkThreadQsSanity(rtsTrue));
1652
1653 #endif
1654
1655     addToRunQueue(cap,t);
1656
1657 #if defined(GRAN)
1658     /* add a ContinueThread event to actually process the thread */
1659     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1660               ContinueThread,
1661               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1662     IF_GRAN_DEBUG(bq, 
1663                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1664                   G_EVENTQ(0);
1665                   G_CURR_THREADQ(0));
1666 #endif
1667     return rtsFalse;
1668 }
1669
1670 /* -----------------------------------------------------------------------------
1671  * Handle a thread that returned to the scheduler with ThreadBlocked
1672  * -------------------------------------------------------------------------- */
1673
1674 static void
1675 scheduleHandleThreadBlocked( StgTSO *t
1676 #if !defined(GRAN) && !defined(DEBUG)
1677     STG_UNUSED
1678 #endif
1679     )
1680 {
1681 #if defined(GRAN)
1682     IF_DEBUG(scheduler,
1683              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1684                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1685              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1686     
1687     // ??? needed; should emit block before
1688     IF_DEBUG(gran, 
1689              DumpGranEvent(GR_DESCHEDULE, t)); 
1690     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1691     /*
1692       ngoq Dogh!
1693       ASSERT(procStatus[CurrentProc]==Busy || 
1694       ((procStatus[CurrentProc]==Fetching) && 
1695       (t->block_info.closure!=(StgClosure*)NULL)));
1696       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1697       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1698       procStatus[CurrentProc]==Fetching)) 
1699       procStatus[CurrentProc] = Idle;
1700     */
1701 #elif defined(PAR)
1702     IF_DEBUG(scheduler,
1703              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1704                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1705     IF_PAR_DEBUG(bq,
1706                  
1707                  if (t->block_info.closure!=(StgClosure*)NULL) 
1708                  print_bq(t->block_info.closure));
1709     
1710     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1711     blockThread(t);
1712     
1713     /* whatever we schedule next, we must log that schedule */
1714     emitSchedule = rtsTrue;
1715     
1716 #else /* !GRAN */
1717
1718       // We don't need to do anything.  The thread is blocked, and it
1719       // has tidied up its stack and placed itself on whatever queue
1720       // it needs to be on.
1721
1722 #if !defined(SMP)
1723     ASSERT(t->why_blocked != NotBlocked);
1724              // This might not be true under SMP: we don't have
1725              // exclusive access to this TSO, so someone might have
1726              // woken it up by now.  This actually happens: try
1727              // conc023 +RTS -N2.
1728 #endif
1729
1730     IF_DEBUG(scheduler,
1731              debugBelch("--<< thread %d (%s) stopped: ", 
1732                         t->id, whatNext_strs[t->what_next]);
1733              printThreadBlockage(t);
1734              debugBelch("\n"));
1735     
1736     /* Only for dumping event to log file 
1737        ToDo: do I need this in GranSim, too?
1738        blockThread(t);
1739     */
1740 #endif
1741 }
1742
1743 /* -----------------------------------------------------------------------------
1744  * Handle a thread that returned to the scheduler with ThreadFinished
1745  * -------------------------------------------------------------------------- */
1746
1747 static rtsBool
1748 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1749 {
1750     /* Need to check whether this was a main thread, and if so,
1751      * return with the return value.
1752      *
1753      * We also end up here if the thread kills itself with an
1754      * uncaught exception, see Exception.cmm.
1755      */
1756     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1757                                   t->id, whatNext_strs[t->what_next]));
1758
1759 #if defined(GRAN)
1760       endThread(t, CurrentProc); // clean-up the thread
1761 #elif defined(PARALLEL_HASKELL)
1762       /* For now all are advisory -- HWL */
1763       //if(t->priority==AdvisoryPriority) ??
1764       advisory_thread_count--; // JB: Caution with this counter, buggy!
1765       
1766 # if defined(DIST)
1767       if(t->dist.priority==RevalPriority)
1768         FinishReval(t);
1769 # endif
1770     
1771 # if defined(EDENOLD)
1772       // the thread could still have an outport... (BUG)
1773       if (t->eden.outport != -1) {
1774       // delete the outport for the tso which has finished...
1775         IF_PAR_DEBUG(eden_ports,
1776                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1777                               t->eden.outport, t->id));
1778         deleteOPT(t);
1779       }
1780       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1781       if (t->eden.epid != -1) {
1782         IF_PAR_DEBUG(eden_ports,
1783                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1784                            t->id, t->eden.epid));
1785         removeTSOfromProcess(t);
1786       }
1787 # endif 
1788
1789 # if defined(PAR)
1790       if (RtsFlags.ParFlags.ParStats.Full &&
1791           !RtsFlags.ParFlags.ParStats.Suppressed) 
1792         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1793
1794       //  t->par only contains statistics: left out for now...
1795       IF_PAR_DEBUG(fish,
1796                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1797                               t->id,t,t->par.sparkname));
1798 # endif
1799 #endif // PARALLEL_HASKELL
1800
1801       //
1802       // Check whether the thread that just completed was a bound
1803       // thread, and if so return with the result.  
1804       //
1805       // There is an assumption here that all thread completion goes
1806       // through this point; we need to make sure that if a thread
1807       // ends up in the ThreadKilled state, that it stays on the run
1808       // queue so it can be dealt with here.
1809       //
1810
1811       if (t->bound) {
1812
1813           if (t->bound != task) {
1814 #if !defined(THREADED_RTS)
1815               // Must be a bound thread that is not the topmost one.  Leave
1816               // it on the run queue until the stack has unwound to the
1817               // point where we can deal with this.  Leaving it on the run
1818               // queue also ensures that the garbage collector knows about
1819               // this thread and its return value (it gets dropped from the
1820               // all_threads list so there's no other way to find it).
1821               appendToRunQueue(cap,t);
1822               return rtsFalse;
1823 #else
1824               // this cannot happen in the threaded RTS, because a
1825               // bound thread can only be run by the appropriate Task.
1826               barf("finished bound thread that isn't mine");
1827 #endif
1828           }
1829
1830           ASSERT(task->tso == t);
1831
1832           if (t->what_next == ThreadComplete) {
1833               if (task->ret) {
1834                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1835                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1836               }
1837               task->stat = Success;
1838           } else {
1839               if (task->ret) {
1840                   *(task->ret) = NULL;
1841               }
1842               if (interrupted) {
1843                   task->stat = Interrupted;
1844               } else {
1845                   task->stat = Killed;
1846               }
1847           }
1848 #ifdef DEBUG
1849           removeThreadLabel((StgWord)task->tso->id);
1850 #endif
1851           return rtsTrue; // tells schedule() to return
1852       }
1853
1854       return rtsFalse;
1855 }
1856
1857 /* -----------------------------------------------------------------------------
1858  * Perform a heap census, if PROFILING
1859  * -------------------------------------------------------------------------- */
1860
1861 static rtsBool
1862 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1863 {
1864 #if defined(PROFILING)
1865     // When we have +RTS -i0 and we're heap profiling, do a census at
1866     // every GC.  This lets us get repeatable runs for debugging.
1867     if (performHeapProfile ||
1868         (RtsFlags.ProfFlags.profileInterval==0 &&
1869          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1870         GarbageCollect(GetRoots, rtsTrue);
1871         heapCensus();
1872         performHeapProfile = rtsFalse;
1873         return rtsTrue;  // true <=> we already GC'd
1874     }
1875 #endif
1876     return rtsFalse;
1877 }
1878
1879 /* -----------------------------------------------------------------------------
1880  * Perform a garbage collection if necessary
1881  * -------------------------------------------------------------------------- */
1882
1883 static void
1884 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1885 {
1886     StgTSO *t;
1887 #ifdef SMP
1888     static volatile StgWord waiting_for_gc;
1889     rtsBool was_waiting;
1890     nat i;
1891 #endif
1892
1893 #ifdef SMP
1894     // In order to GC, there must be no threads running Haskell code.
1895     // Therefore, the GC thread needs to hold *all* the capabilities,
1896     // and release them after the GC has completed.  
1897     //
1898     // This seems to be the simplest way: previous attempts involved
1899     // making all the threads with capabilities give up their
1900     // capabilities and sleep except for the *last* one, which
1901     // actually did the GC.  But it's quite hard to arrange for all
1902     // the other tasks to sleep and stay asleep.
1903     //
1904         
1905     was_waiting = cas(&waiting_for_gc, 0, 1);
1906     if (was_waiting) {
1907         do {
1908             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1909             yieldCapability(&cap,task);
1910         } while (waiting_for_gc);
1911         return;
1912     }
1913
1914     for (i=0; i < n_capabilities; i++) {
1915         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1916         if (cap != &capabilities[i]) {
1917             Capability *pcap = &capabilities[i];
1918             // we better hope this task doesn't get migrated to
1919             // another Capability while we're waiting for this one.
1920             // It won't, because load balancing happens while we have
1921             // all the Capabilities, but even so it's a slightly
1922             // unsavoury invariant.
1923             task->cap = pcap;
1924             context_switch = 1;
1925             waitForReturnCapability(&pcap, task);
1926             if (pcap != &capabilities[i]) {
1927                 barf("scheduleDoGC: got the wrong capability");
1928             }
1929         }
1930     }
1931
1932     waiting_for_gc = rtsFalse;
1933 #endif
1934
1935     /* Kick any transactions which are invalid back to their
1936      * atomically frames.  When next scheduled they will try to
1937      * commit, this commit will fail and they will retry.
1938      */
1939     { 
1940         StgTSO *next;
1941
1942         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1943             if (t->what_next == ThreadRelocated) {
1944                 next = t->link;
1945             } else {
1946                 next = t->global_link;
1947                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1948                     if (!stmValidateNestOfTransactions (t -> trec)) {
1949                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1950                         
1951                         // strip the stack back to the
1952                         // ATOMICALLY_FRAME, aborting the (nested)
1953                         // transaction, and saving the stack of any
1954                         // partially-evaluated thunks on the heap.
1955                         raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1956                         
1957 #ifdef REG_R1
1958                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1959 #endif
1960                     }
1961                 }
1962             }
1963         }
1964     }
1965     
1966     // so this happens periodically:
1967     scheduleCheckBlackHoles(cap);
1968     
1969     IF_DEBUG(scheduler, printAllThreads());
1970
1971     /* everybody back, start the GC.
1972      * Could do it in this thread, or signal a condition var
1973      * to do it in another thread.  Either way, we need to
1974      * broadcast on gc_pending_cond afterward.
1975      */
1976 #if defined(THREADED_RTS)
1977     IF_DEBUG(scheduler,sched_belch("doing GC"));
1978 #endif
1979     GarbageCollect(GetRoots, force_major);
1980     
1981 #if defined(SMP)
1982     // release our stash of capabilities.
1983     for (i = 0; i < n_capabilities; i++) {
1984         if (cap != &capabilities[i]) {
1985             task->cap = &capabilities[i];
1986             releaseCapability(&capabilities[i]);
1987         }
1988     }
1989     task->cap = cap;
1990 #endif
1991
1992 #if defined(GRAN)
1993     /* add a ContinueThread event to continue execution of current thread */
1994     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1995               ContinueThread,
1996               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1997     IF_GRAN_DEBUG(bq, 
1998                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1999                   G_EVENTQ(0);
2000                   G_CURR_THREADQ(0));
2001 #endif /* GRAN */
2002 }
2003
2004 /* ---------------------------------------------------------------------------
2005  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2006  * used by Control.Concurrent for error checking.
2007  * ------------------------------------------------------------------------- */
2008  
2009 StgBool
2010 rtsSupportsBoundThreads(void)
2011 {
2012 #if defined(THREADED_RTS)
2013   return rtsTrue;
2014 #else
2015   return rtsFalse;
2016 #endif
2017 }
2018
2019 /* ---------------------------------------------------------------------------
2020  * isThreadBound(tso): check whether tso is bound to an OS thread.
2021  * ------------------------------------------------------------------------- */
2022  
2023 StgBool
2024 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2025 {
2026 #if defined(THREADED_RTS)
2027   return (tso->bound != NULL);
2028 #endif
2029   return rtsFalse;
2030 }
2031
2032 /* ---------------------------------------------------------------------------
2033  * Singleton fork(). Do not copy any running threads.
2034  * ------------------------------------------------------------------------- */
2035
2036 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2037 #define FORKPROCESS_PRIMOP_SUPPORTED
2038 #endif
2039
2040 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2041 static void 
2042 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2043 #endif
2044 StgInt
2045 forkProcess(HsStablePtr *entry
2046 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2047             STG_UNUSED
2048 #endif
2049            )
2050 {
2051 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2052     Task *task;
2053     pid_t pid;
2054     StgTSO* t,*next;
2055     Capability *cap;
2056     
2057     IF_DEBUG(scheduler,sched_belch("forking!"));
2058     
2059     // ToDo: for SMP, we should probably acquire *all* the capabilities
2060     cap = rts_lock();
2061     
2062     pid = fork();
2063     
2064     if (pid) { // parent
2065         
2066         // just return the pid
2067         rts_unlock(cap);
2068         return pid;
2069         
2070     } else { // child
2071         
2072         // delete all threads
2073         cap->run_queue_hd = END_TSO_QUEUE;
2074         cap->run_queue_tl = END_TSO_QUEUE;
2075         
2076         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2077             next = t->link;
2078             
2079             // don't allow threads to catch the ThreadKilled exception
2080             deleteThreadImmediately(cap,t);
2081         }
2082         
2083         // wipe the task list
2084         ACQUIRE_LOCK(&sched_mutex);
2085         for (task = all_tasks; task != NULL; task=task->all_link) {
2086             if (task != cap->running_task) discardTask(task);
2087         }
2088         RELEASE_LOCK(&sched_mutex);
2089
2090 #if defined(THREADED_RTS)
2091         // wipe our spare workers list.
2092         cap->spare_workers = NULL;
2093 #endif
2094
2095         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2096         rts_checkSchedStatus("forkProcess",cap);
2097         
2098         rts_unlock(cap);
2099         hs_exit();                      // clean up and exit
2100         stg_exit(EXIT_SUCCESS);
2101     }
2102 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2103     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2104     return -1;
2105 #endif
2106 }
2107
2108 /* ---------------------------------------------------------------------------
2109  * Delete the threads on the run queue of the current capability.
2110  * ------------------------------------------------------------------------- */
2111    
2112 static void
2113 deleteRunQueue (Capability *cap)
2114 {
2115     StgTSO *t, *next;
2116     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2117         ASSERT(t->what_next != ThreadRelocated);
2118         next = t->link;
2119         deleteThread(cap, t);
2120     }
2121 }
2122
2123 /* startThread and  insertThread are now in GranSim.c -- HWL */
2124
2125
2126 /* -----------------------------------------------------------------------------
2127    Managing the suspended_ccalling_tasks list.
2128    Locks required: sched_mutex
2129    -------------------------------------------------------------------------- */
2130
2131 STATIC_INLINE void
2132 suspendTask (Capability *cap, Task *task)
2133 {
2134     ASSERT(task->next == NULL && task->prev == NULL);
2135     task->next = cap->suspended_ccalling_tasks;
2136     task->prev = NULL;
2137     if (cap->suspended_ccalling_tasks) {
2138         cap->suspended_ccalling_tasks->prev = task;
2139     }
2140     cap->suspended_ccalling_tasks = task;
2141 }
2142
2143 STATIC_INLINE void
2144 recoverSuspendedTask (Capability *cap, Task *task)
2145 {
2146     if (task->prev) {
2147         task->prev->next = task->next;
2148     } else {
2149         ASSERT(cap->suspended_ccalling_tasks == task);
2150         cap->suspended_ccalling_tasks = task->next;
2151     }
2152     if (task->next) {
2153         task->next->prev = task->prev;
2154     }
2155     task->next = task->prev = NULL;
2156 }
2157
2158 /* ---------------------------------------------------------------------------
2159  * Suspending & resuming Haskell threads.
2160  * 
2161  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2162  * its capability before calling the C function.  This allows another
2163  * task to pick up the capability and carry on running Haskell
2164  * threads.  It also means that if the C call blocks, it won't lock
2165  * the whole system.
2166  *
2167  * The Haskell thread making the C call is put to sleep for the
2168  * duration of the call, on the susepended_ccalling_threads queue.  We
2169  * give out a token to the task, which it can use to resume the thread
2170  * on return from the C function.
2171  * ------------------------------------------------------------------------- */
2172    
2173 void *
2174 suspendThread (StgRegTable *reg)
2175 {
2176   Capability *cap;
2177   int saved_errno = errno;
2178   StgTSO *tso;
2179   Task *task;
2180
2181   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2182    */
2183   cap = regTableToCapability(reg);
2184
2185   task = cap->running_task;
2186   tso = cap->r.rCurrentTSO;
2187
2188   IF_DEBUG(scheduler,
2189            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2190
2191   // XXX this might not be necessary --SDM
2192   tso->what_next = ThreadRunGHC;
2193
2194   threadPaused(cap,tso);
2195
2196   if(tso->blocked_exceptions == NULL)  {
2197       tso->why_blocked = BlockedOnCCall;
2198       tso->blocked_exceptions = END_TSO_QUEUE;
2199   } else {
2200       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2201   }
2202
2203   // Hand back capability
2204   task->suspended_tso = tso;
2205
2206   ACQUIRE_LOCK(&cap->lock);
2207
2208   suspendTask(cap,task);
2209   cap->in_haskell = rtsFalse;
2210   releaseCapability_(cap);
2211   
2212   RELEASE_LOCK(&cap->lock);
2213
2214 #if defined(THREADED_RTS)
2215   /* Preparing to leave the RTS, so ensure there's a native thread/task
2216      waiting to take over.
2217   */
2218   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2219 #endif
2220
2221   errno = saved_errno;
2222   return task;
2223 }
2224
2225 StgRegTable *
2226 resumeThread (void *task_)
2227 {
2228     StgTSO *tso;
2229     Capability *cap;
2230     int saved_errno = errno;
2231     Task *task = task_;
2232
2233     cap = task->cap;
2234     // Wait for permission to re-enter the RTS with the result.
2235     waitForReturnCapability(&cap,task);
2236     // we might be on a different capability now... but if so, our
2237     // entry on the suspended_ccalling_tasks list will also have been
2238     // migrated.
2239
2240     // Remove the thread from the suspended list
2241     recoverSuspendedTask(cap,task);
2242
2243     tso = task->suspended_tso;
2244     task->suspended_tso = NULL;
2245     tso->link = END_TSO_QUEUE;
2246     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2247     
2248     if (tso->why_blocked == BlockedOnCCall) {
2249         awakenBlockedQueue(cap,tso->blocked_exceptions);
2250         tso->blocked_exceptions = NULL;
2251     }
2252     
2253     /* Reset blocking status */
2254     tso->why_blocked  = NotBlocked;
2255     
2256     cap->r.rCurrentTSO = tso;
2257     cap->in_haskell = rtsTrue;
2258     errno = saved_errno;
2259
2260     return &cap->r;
2261 }
2262
2263 /* ---------------------------------------------------------------------------
2264  * Comparing Thread ids.
2265  *
2266  * This is used from STG land in the implementation of the
2267  * instances of Eq/Ord for ThreadIds.
2268  * ------------------------------------------------------------------------ */
2269
2270 int
2271 cmp_thread(StgPtr tso1, StgPtr tso2) 
2272
2273   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2274   StgThreadID id2 = ((StgTSO *)tso2)->id;
2275  
2276   if (id1 < id2) return (-1);
2277   if (id1 > id2) return 1;
2278   return 0;
2279 }
2280
2281 /* ---------------------------------------------------------------------------
2282  * Fetching the ThreadID from an StgTSO.
2283  *
2284  * This is used in the implementation of Show for ThreadIds.
2285  * ------------------------------------------------------------------------ */
2286 int
2287 rts_getThreadId(StgPtr tso) 
2288 {
2289   return ((StgTSO *)tso)->id;
2290 }
2291
2292 #ifdef DEBUG
2293 void
2294 labelThread(StgPtr tso, char *label)
2295 {
2296   int len;
2297   void *buf;
2298
2299   /* Caveat: Once set, you can only set the thread name to "" */
2300   len = strlen(label)+1;
2301   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2302   strncpy(buf,label,len);
2303   /* Update will free the old memory for us */
2304   updateThreadLabel(((StgTSO *)tso)->id,buf);
2305 }
2306 #endif /* DEBUG */
2307
2308 /* ---------------------------------------------------------------------------
2309    Create a new thread.
2310
2311    The new thread starts with the given stack size.  Before the
2312    scheduler can run, however, this thread needs to have a closure
2313    (and possibly some arguments) pushed on its stack.  See
2314    pushClosure() in Schedule.h.
2315
2316    createGenThread() and createIOThread() (in SchedAPI.h) are
2317    convenient packaged versions of this function.
2318
2319    currently pri (priority) is only used in a GRAN setup -- HWL
2320    ------------------------------------------------------------------------ */
2321 #if defined(GRAN)
2322 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2323 StgTSO *
2324 createThread(nat size, StgInt pri)
2325 #else
2326 StgTSO *
2327 createThread(Capability *cap, nat size)
2328 #endif
2329 {
2330     StgTSO *tso;
2331     nat stack_size;
2332
2333     /* sched_mutex is *not* required */
2334
2335     /* First check whether we should create a thread at all */
2336 #if defined(PARALLEL_HASKELL)
2337     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2338     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2339         threadsIgnored++;
2340         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2341                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2342         return END_TSO_QUEUE;
2343     }
2344     threadsCreated++;
2345 #endif
2346
2347 #if defined(GRAN)
2348     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2349 #endif
2350
2351     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2352
2353     /* catch ridiculously small stack sizes */
2354     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2355         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2356     }
2357
2358     stack_size = size - TSO_STRUCT_SIZEW;
2359     
2360     tso = (StgTSO *)allocateLocal(cap, size);
2361     TICK_ALLOC_TSO(stack_size, 0);
2362
2363     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2364 #if defined(GRAN)
2365     SET_GRAN_HDR(tso, ThisPE);
2366 #endif
2367
2368     // Always start with the compiled code evaluator
2369     tso->what_next = ThreadRunGHC;
2370
2371     tso->why_blocked  = NotBlocked;
2372     tso->blocked_exceptions = NULL;
2373     
2374     tso->saved_errno = 0;
2375     tso->bound = NULL;
2376     
2377     tso->stack_size     = stack_size;
2378     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2379                           - TSO_STRUCT_SIZEW;
2380     tso->sp             = (P_)&(tso->stack) + stack_size;
2381
2382     tso->trec = NO_TREC;
2383     
2384 #ifdef PROFILING
2385     tso->prof.CCCS = CCS_MAIN;
2386 #endif
2387     
2388   /* put a stop frame on the stack */
2389     tso->sp -= sizeofW(StgStopFrame);
2390     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2391     tso->link = END_TSO_QUEUE;
2392     
2393   // ToDo: check this
2394 #if defined(GRAN)
2395     /* uses more flexible routine in GranSim */
2396     insertThread(tso, CurrentProc);
2397 #else
2398     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2399      * from its creation
2400      */
2401 #endif
2402     
2403 #if defined(GRAN) 
2404     if (RtsFlags.GranFlags.GranSimStats.Full) 
2405         DumpGranEvent(GR_START,tso);
2406 #elif defined(PARALLEL_HASKELL)
2407     if (RtsFlags.ParFlags.ParStats.Full) 
2408         DumpGranEvent(GR_STARTQ,tso);
2409     /* HACk to avoid SCHEDULE 
2410        LastTSO = tso; */
2411 #endif
2412     
2413     /* Link the new thread on the global thread list.
2414      */
2415     ACQUIRE_LOCK(&sched_mutex);
2416     tso->id = next_thread_id++;  // while we have the mutex
2417     tso->global_link = all_threads;
2418     all_threads = tso;
2419     RELEASE_LOCK(&sched_mutex);
2420     
2421 #if defined(DIST)
2422     tso->dist.priority = MandatoryPriority; //by default that is...
2423 #endif
2424     
2425 #if defined(GRAN)
2426     tso->gran.pri = pri;
2427 # if defined(DEBUG)
2428     tso->gran.magic = TSO_MAGIC; // debugging only
2429 # endif
2430     tso->gran.sparkname   = 0;
2431     tso->gran.startedat   = CURRENT_TIME; 
2432     tso->gran.exported    = 0;
2433     tso->gran.basicblocks = 0;
2434     tso->gran.allocs      = 0;
2435     tso->gran.exectime    = 0;
2436     tso->gran.fetchtime   = 0;
2437     tso->gran.fetchcount  = 0;
2438     tso->gran.blocktime   = 0;
2439     tso->gran.blockcount  = 0;
2440     tso->gran.blockedat   = 0;
2441     tso->gran.globalsparks = 0;
2442     tso->gran.localsparks  = 0;
2443     if (RtsFlags.GranFlags.Light)
2444         tso->gran.clock  = Now; /* local clock */
2445     else
2446         tso->gran.clock  = 0;
2447     
2448     IF_DEBUG(gran,printTSO(tso));
2449 #elif defined(PARALLEL_HASKELL)
2450 # if defined(DEBUG)
2451     tso->par.magic = TSO_MAGIC; // debugging only
2452 # endif
2453     tso->par.sparkname   = 0;
2454     tso->par.startedat   = CURRENT_TIME; 
2455     tso->par.exported    = 0;
2456     tso->par.basicblocks = 0;
2457     tso->par.allocs      = 0;
2458     tso->par.exectime    = 0;
2459     tso->par.fetchtime   = 0;
2460     tso->par.fetchcount  = 0;
2461     tso->par.blocktime   = 0;
2462     tso->par.blockcount  = 0;
2463     tso->par.blockedat   = 0;
2464     tso->par.globalsparks = 0;
2465     tso->par.localsparks  = 0;
2466 #endif
2467     
2468 #if defined(GRAN)
2469     globalGranStats.tot_threads_created++;
2470     globalGranStats.threads_created_on_PE[CurrentProc]++;
2471     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2472     globalGranStats.tot_sq_probes++;
2473 #elif defined(PARALLEL_HASKELL)
2474     // collect parallel global statistics (currently done together with GC stats)
2475     if (RtsFlags.ParFlags.ParStats.Global &&
2476         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2477         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2478         globalParStats.tot_threads_created++;
2479     }
2480 #endif 
2481     
2482 #if defined(GRAN)
2483     IF_GRAN_DEBUG(pri,
2484                   sched_belch("==__ schedule: Created TSO %d (%p);",
2485                               CurrentProc, tso, tso->id));
2486 #elif defined(PARALLEL_HASKELL)
2487     IF_PAR_DEBUG(verbose,
2488                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2489                              (long)tso->id, tso, advisory_thread_count));
2490 #else
2491     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2492                                    (long)tso->id, (long)tso->stack_size));
2493 #endif    
2494     return tso;
2495 }
2496
2497 #if defined(PAR)
2498 /* RFP:
2499    all parallel thread creation calls should fall through the following routine.
2500 */
2501 StgTSO *
2502 createThreadFromSpark(rtsSpark spark) 
2503 { StgTSO *tso;
2504   ASSERT(spark != (rtsSpark)NULL);
2505 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2506   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2507   { threadsIgnored++;
2508     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2509           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2510     return END_TSO_QUEUE;
2511   }
2512   else
2513   { threadsCreated++;
2514     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2515     if (tso==END_TSO_QUEUE)     
2516       barf("createSparkThread: Cannot create TSO");
2517 #if defined(DIST)
2518     tso->priority = AdvisoryPriority;
2519 #endif
2520     pushClosure(tso,spark);
2521     addToRunQueue(tso);
2522     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2523   }
2524   return tso;
2525 }
2526 #endif
2527
2528 /*
2529   Turn a spark into a thread.
2530   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2531 */
2532 #if 0
2533 StgTSO *
2534 activateSpark (rtsSpark spark) 
2535 {
2536   StgTSO *tso;
2537
2538   tso = createSparkThread(spark);
2539   if (RtsFlags.ParFlags.ParStats.Full) {   
2540     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2541       IF_PAR_DEBUG(verbose,
2542                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2543                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2544   }
2545   // ToDo: fwd info on local/global spark to thread -- HWL
2546   // tso->gran.exported =  spark->exported;
2547   // tso->gran.locked =   !spark->global;
2548   // tso->gran.sparkname = spark->name;
2549
2550   return tso;
2551 }
2552 #endif
2553
2554 /* ---------------------------------------------------------------------------
2555  * scheduleThread()
2556  *
2557  * scheduleThread puts a thread on the end  of the runnable queue.
2558  * This will usually be done immediately after a thread is created.
2559  * The caller of scheduleThread must create the thread using e.g.
2560  * createThread and push an appropriate closure
2561  * on this thread's stack before the scheduler is invoked.
2562  * ------------------------------------------------------------------------ */
2563
2564 void
2565 scheduleThread(Capability *cap, StgTSO *tso)
2566 {
2567     // The thread goes at the *end* of the run-queue, to avoid possible
2568     // starvation of any threads already on the queue.
2569     appendToRunQueue(cap,tso);
2570 }
2571
2572 Capability *
2573 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2574 {
2575     Task *task;
2576
2577     // We already created/initialised the Task
2578     task = cap->running_task;
2579
2580     // This TSO is now a bound thread; make the Task and TSO
2581     // point to each other.
2582     tso->bound = task;
2583
2584     task->tso = tso;
2585     task->ret = ret;
2586     task->stat = NoStatus;
2587
2588     appendToRunQueue(cap,tso);
2589
2590     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2591
2592 #if defined(GRAN)
2593     /* GranSim specific init */
2594     CurrentTSO = m->tso;                // the TSO to run
2595     procStatus[MainProc] = Busy;        // status of main PE
2596     CurrentProc = MainProc;             // PE to run it on
2597 #endif
2598
2599     cap = schedule(cap,task);
2600
2601     ASSERT(task->stat != NoStatus);
2602     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2603
2604     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2605     return cap;
2606 }
2607
2608 /* ----------------------------------------------------------------------------
2609  * Starting Tasks
2610  * ------------------------------------------------------------------------- */
2611
2612 #if defined(THREADED_RTS)
2613 void
2614 workerStart(Task *task)
2615 {
2616     Capability *cap;
2617
2618     // See startWorkerTask().
2619     ACQUIRE_LOCK(&task->lock);
2620     cap = task->cap;
2621     RELEASE_LOCK(&task->lock);
2622
2623     // set the thread-local pointer to the Task:
2624     taskEnter(task);
2625
2626     // schedule() runs without a lock.
2627     cap = schedule(cap,task);
2628
2629     // On exit from schedule(), we have a Capability.
2630     releaseCapability(cap);
2631     taskStop(task);
2632 }
2633 #endif
2634
2635 /* ---------------------------------------------------------------------------
2636  * initScheduler()
2637  *
2638  * Initialise the scheduler.  This resets all the queues - if the
2639  * queues contained any threads, they'll be garbage collected at the
2640  * next pass.
2641  *
2642  * ------------------------------------------------------------------------ */
2643
2644 void 
2645 initScheduler(void)
2646 {
2647 #if defined(GRAN)
2648   nat i;
2649   for (i=0; i<=MAX_PROC; i++) {
2650     run_queue_hds[i]      = END_TSO_QUEUE;
2651     run_queue_tls[i]      = END_TSO_QUEUE;
2652     blocked_queue_hds[i]  = END_TSO_QUEUE;
2653     blocked_queue_tls[i]  = END_TSO_QUEUE;
2654     ccalling_threadss[i]  = END_TSO_QUEUE;
2655     blackhole_queue[i]    = END_TSO_QUEUE;
2656     sleeping_queue        = END_TSO_QUEUE;
2657   }
2658 #elif !defined(THREADED_RTS)
2659   blocked_queue_hd  = END_TSO_QUEUE;
2660   blocked_queue_tl  = END_TSO_QUEUE;
2661   sleeping_queue    = END_TSO_QUEUE;
2662 #endif
2663
2664   blackhole_queue   = END_TSO_QUEUE;
2665   all_threads       = END_TSO_QUEUE;
2666
2667   context_switch = 0;
2668   interrupted    = 0;
2669
2670   RtsFlags.ConcFlags.ctxtSwitchTicks =
2671       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2672       
2673 #if defined(THREADED_RTS)
2674   /* Initialise the mutex and condition variables used by
2675    * the scheduler. */
2676   initMutex(&sched_mutex);
2677 #endif
2678   
2679   ACQUIRE_LOCK(&sched_mutex);
2680
2681   /* A capability holds the state a native thread needs in
2682    * order to execute STG code. At least one capability is
2683    * floating around (only SMP builds have more than one).
2684    */
2685   initCapabilities();
2686
2687   initTaskManager();
2688
2689 #if defined(SMP) || defined(PARALLEL_HASKELL)
2690   initSparkPools();
2691 #endif
2692
2693 #if defined(SMP)
2694   /*
2695    * Eagerly start one worker to run each Capability, except for
2696    * Capability 0.  The idea is that we're probably going to start a
2697    * bound thread on Capability 0 pretty soon, so we don't want a
2698    * worker task hogging it.
2699    */
2700   { 
2701       nat i;
2702       Capability *cap;
2703       for (i = 1; i < n_capabilities; i++) {
2704           cap = &capabilities[i];
2705           ACQUIRE_LOCK(&cap->lock);
2706           startWorkerTask(cap, workerStart);
2707           RELEASE_LOCK(&cap->lock);
2708       }
2709   }
2710 #endif
2711
2712   RELEASE_LOCK(&sched_mutex);
2713 }
2714
2715 void
2716 exitScheduler( void )
2717 {
2718     interrupted = rtsTrue;
2719     shutting_down_scheduler = rtsTrue;
2720
2721 #if defined(THREADED_RTS)
2722     { 
2723         Task *task;
2724         nat i;
2725         
2726         ACQUIRE_LOCK(&sched_mutex);
2727         task = newBoundTask();
2728         RELEASE_LOCK(&sched_mutex);
2729
2730         for (i = 0; i < n_capabilities; i++) {
2731             shutdownCapability(&capabilities[i], task);
2732         }
2733         boundTaskExiting(task);
2734         stopTaskManager();
2735     }
2736 #endif
2737 }
2738
2739 /* ---------------------------------------------------------------------------
2740    Where are the roots that we know about?
2741
2742         - all the threads on the runnable queue
2743         - all the threads on the blocked queue
2744         - all the threads on the sleeping queue
2745         - all the thread currently executing a _ccall_GC
2746         - all the "main threads"
2747      
2748    ------------------------------------------------------------------------ */
2749
2750 /* This has to be protected either by the scheduler monitor, or by the
2751         garbage collection monitor (probably the latter).
2752         KH @ 25/10/99
2753 */
2754
2755 void
2756 GetRoots( evac_fn evac )
2757 {
2758     nat i;
2759     Capability *cap;
2760     Task *task;
2761
2762 #if defined(GRAN)
2763     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2764         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2765             evac((StgClosure **)&run_queue_hds[i]);
2766         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2767             evac((StgClosure **)&run_queue_tls[i]);
2768         
2769         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2770             evac((StgClosure **)&blocked_queue_hds[i]);
2771         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2772             evac((StgClosure **)&blocked_queue_tls[i]);
2773         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2774             evac((StgClosure **)&ccalling_threads[i]);
2775     }
2776
2777     markEventQueue();
2778
2779 #else /* !GRAN */
2780
2781     for (i = 0; i < n_capabilities; i++) {
2782         cap = &capabilities[i];
2783         evac((StgClosure **)&cap->run_queue_hd);
2784         evac((StgClosure **)&cap->run_queue_tl);
2785         
2786         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2787              task=task->next) {
2788             evac((StgClosure **)&task->suspended_tso);
2789         }
2790     }
2791     
2792 #if !defined(THREADED_RTS)
2793     evac((StgClosure **)&blocked_queue_hd);
2794     evac((StgClosure **)&blocked_queue_tl);
2795     evac((StgClosure **)&sleeping_queue);
2796 #endif 
2797 #endif
2798
2799     evac((StgClosure **)&blackhole_queue);
2800
2801 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2802     markSparkQueue(evac);
2803 #endif
2804     
2805 #if defined(RTS_USER_SIGNALS)
2806     // mark the signal handlers (signals should be already blocked)
2807     markSignalHandlers(evac);
2808 #endif
2809 }
2810
2811 /* -----------------------------------------------------------------------------
2812    performGC
2813
2814    This is the interface to the garbage collector from Haskell land.
2815    We provide this so that external C code can allocate and garbage
2816    collect when called from Haskell via _ccall_GC.
2817
2818    It might be useful to provide an interface whereby the programmer
2819    can specify more roots (ToDo).
2820    
2821    This needs to be protected by the GC condition variable above.  KH.
2822    -------------------------------------------------------------------------- */
2823
2824 static void (*extra_roots)(evac_fn);
2825
2826 void
2827 performGC(void)
2828 {
2829 #ifdef THREADED_RTS
2830     // ToDo: we have to grab all the capabilities here.
2831     errorBelch("performGC not supported in threaded RTS (yet)");
2832     stg_exit(EXIT_FAILURE);
2833 #endif
2834     /* Obligated to hold this lock upon entry */
2835     GarbageCollect(GetRoots,rtsFalse);
2836 }
2837
2838 void
2839 performMajorGC(void)
2840 {
2841 #ifdef THREADED_RTS
2842     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2843     stg_exit(EXIT_FAILURE);
2844 #endif
2845     GarbageCollect(GetRoots,rtsTrue);
2846 }
2847
2848 static void
2849 AllRoots(evac_fn evac)
2850 {
2851     GetRoots(evac);             // the scheduler's roots
2852     extra_roots(evac);          // the user's roots
2853 }
2854
2855 void
2856 performGCWithRoots(void (*get_roots)(evac_fn))
2857 {
2858 #ifdef THREADED_RTS
2859     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2860     stg_exit(EXIT_FAILURE);
2861 #endif
2862     extra_roots = get_roots;
2863     GarbageCollect(AllRoots,rtsFalse);
2864 }
2865
2866 /* -----------------------------------------------------------------------------
2867    Stack overflow
2868
2869    If the thread has reached its maximum stack size, then raise the
2870    StackOverflow exception in the offending thread.  Otherwise
2871    relocate the TSO into a larger chunk of memory and adjust its stack
2872    size appropriately.
2873    -------------------------------------------------------------------------- */
2874
2875 static StgTSO *
2876 threadStackOverflow(Capability *cap, StgTSO *tso)
2877 {
2878   nat new_stack_size, stack_words;
2879   lnat new_tso_size;
2880   StgPtr new_sp;
2881   StgTSO *dest;
2882
2883   IF_DEBUG(sanity,checkTSO(tso));
2884   if (tso->stack_size >= tso->max_stack_size) {
2885
2886     IF_DEBUG(gc,
2887              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2888                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2889              /* If we're debugging, just print out the top of the stack */
2890              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2891                                               tso->sp+64)));
2892
2893     /* Send this thread the StackOverflow exception */
2894     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2895     return tso;
2896   }
2897
2898   /* Try to double the current stack size.  If that takes us over the
2899    * maximum stack size for this thread, then use the maximum instead.
2900    * Finally round up so the TSO ends up as a whole number of blocks.
2901    */
2902   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2903   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2904                                        TSO_STRUCT_SIZE)/sizeof(W_);
2905   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2906   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2907
2908   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2909
2910   dest = (StgTSO *)allocate(new_tso_size);
2911   TICK_ALLOC_TSO(new_stack_size,0);
2912
2913   /* copy the TSO block and the old stack into the new area */
2914   memcpy(dest,tso,TSO_STRUCT_SIZE);
2915   stack_words = tso->stack + tso->stack_size - tso->sp;
2916   new_sp = (P_)dest + new_tso_size - stack_words;
2917   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2918
2919   /* relocate the stack pointers... */
2920   dest->sp         = new_sp;
2921   dest->stack_size = new_stack_size;
2922         
2923   /* Mark the old TSO as relocated.  We have to check for relocated
2924    * TSOs in the garbage collector and any primops that deal with TSOs.
2925    *
2926    * It's important to set the sp value to just beyond the end
2927    * of the stack, so we don't attempt to scavenge any part of the
2928    * dead TSO's stack.
2929    */
2930   tso->what_next = ThreadRelocated;
2931   tso->link = dest;
2932   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2933   tso->why_blocked = NotBlocked;
2934
2935   IF_PAR_DEBUG(verbose,
2936                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2937                      tso->id, tso, tso->stack_size);
2938                /* If we're debugging, just print out the top of the stack */
2939                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2940                                                 tso->sp+64)));
2941   
2942   IF_DEBUG(sanity,checkTSO(tso));
2943 #if 0
2944   IF_DEBUG(scheduler,printTSO(dest));
2945 #endif
2946
2947   return dest;
2948 }
2949
2950 /* ---------------------------------------------------------------------------
2951    Wake up a queue that was blocked on some resource.
2952    ------------------------------------------------------------------------ */
2953
2954 #if defined(GRAN)
2955 STATIC_INLINE void
2956 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2957 {
2958 }
2959 #elif defined(PARALLEL_HASKELL)
2960 STATIC_INLINE void
2961 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2962 {
2963   /* write RESUME events to log file and
2964      update blocked and fetch time (depending on type of the orig closure) */
2965   if (RtsFlags.ParFlags.ParStats.Full) {
2966     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2967                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2968                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2969     if (emptyRunQueue())
2970       emitSchedule = rtsTrue;
2971
2972     switch (get_itbl(node)->type) {
2973         case FETCH_ME_BQ:
2974           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2975           break;
2976         case RBH:
2977         case FETCH_ME:
2978         case BLACKHOLE_BQ:
2979           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2980           break;
2981 #ifdef DIST
2982         case MVAR:
2983           break;
2984 #endif    
2985         default:
2986           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2987         }
2988       }
2989 }
2990 #endif
2991
2992 #if defined(GRAN)
2993 StgBlockingQueueElement *
2994 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2995 {
2996     StgTSO *tso;
2997     PEs node_loc, tso_loc;
2998
2999     node_loc = where_is(node); // should be lifted out of loop
3000     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3001     tso_loc = where_is((StgClosure *)tso);
3002     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3003       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3004       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3005       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3006       // insertThread(tso, node_loc);
3007       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3008                 ResumeThread,
3009                 tso, node, (rtsSpark*)NULL);
3010       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3011       // len_local++;
3012       // len++;
3013     } else { // TSO is remote (actually should be FMBQ)
3014       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3015                                   RtsFlags.GranFlags.Costs.gunblocktime +
3016                                   RtsFlags.GranFlags.Costs.latency;
3017       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3018                 UnblockThread,
3019                 tso, node, (rtsSpark*)NULL);
3020       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3021       // len++;
3022     }
3023     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3024     IF_GRAN_DEBUG(bq,
3025                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3026                           (node_loc==tso_loc ? "Local" : "Global"), 
3027                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3028     tso->block_info.closure = NULL;
3029     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3030                              tso->id, tso));
3031 }
3032 #elif defined(PARALLEL_HASKELL)
3033 StgBlockingQueueElement *
3034 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3035 {
3036     StgBlockingQueueElement *next;
3037
3038     switch (get_itbl(bqe)->type) {
3039     case TSO:
3040       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3041       /* if it's a TSO just push it onto the run_queue */
3042       next = bqe->link;
3043       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3044       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3045       threadRunnable();
3046       unblockCount(bqe, node);
3047       /* reset blocking status after dumping event */
3048       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3049       break;
3050
3051     case BLOCKED_FETCH:
3052       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3053       next = bqe->link;
3054       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3055       PendingFetches = (StgBlockedFetch *)bqe;
3056       break;
3057
3058 # if defined(DEBUG)
3059       /* can ignore this case in a non-debugging setup; 
3060          see comments on RBHSave closures above */
3061     case CONSTR:
3062       /* check that the closure is an RBHSave closure */
3063       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3064              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3065              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3066       break;
3067
3068     default:
3069       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3070            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3071            (StgClosure *)bqe);
3072 # endif
3073     }
3074   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3075   return next;
3076 }
3077 #endif
3078
3079 StgTSO *
3080 unblockOne(Capability *cap, StgTSO *tso)
3081 {
3082   StgTSO *next;
3083
3084   ASSERT(get_itbl(tso)->type == TSO);
3085   ASSERT(tso->why_blocked != NotBlocked);
3086   tso->why_blocked = NotBlocked;
3087   next = tso->link;
3088   tso->link = END_TSO_QUEUE;
3089
3090   // We might have just migrated this TSO to our Capability:
3091   if (tso->bound) {
3092       tso->bound->cap = cap;
3093   }
3094
3095   appendToRunQueue(cap,tso);
3096
3097   // we're holding a newly woken thread, make sure we context switch
3098   // quickly so we can migrate it if necessary.
3099   context_switch = 1;
3100   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3101   return next;
3102 }
3103
3104
3105 #if defined(GRAN)
3106 void 
3107 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3108 {
3109   StgBlockingQueueElement *bqe;
3110   PEs node_loc;
3111   nat len = 0; 
3112
3113   IF_GRAN_DEBUG(bq, 
3114                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3115                       node, CurrentProc, CurrentTime[CurrentProc], 
3116                       CurrentTSO->id, CurrentTSO));
3117
3118   node_loc = where_is(node);
3119
3120   ASSERT(q == END_BQ_QUEUE ||
3121          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3122          get_itbl(q)->type == CONSTR); // closure (type constructor)
3123   ASSERT(is_unique(node));
3124
3125   /* FAKE FETCH: magically copy the node to the tso's proc;
3126      no Fetch necessary because in reality the node should not have been 
3127      moved to the other PE in the first place
3128   */
3129   if (CurrentProc!=node_loc) {
3130     IF_GRAN_DEBUG(bq, 
3131                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3132                         node, node_loc, CurrentProc, CurrentTSO->id, 
3133                         // CurrentTSO, where_is(CurrentTSO),
3134                         node->header.gran.procs));
3135     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3136     IF_GRAN_DEBUG(bq, 
3137                   debugBelch("## new bitmask of node %p is %#x\n",
3138                         node, node->header.gran.procs));
3139     if (RtsFlags.GranFlags.GranSimStats.Global) {
3140       globalGranStats.tot_fake_fetches++;
3141     }
3142   }
3143
3144   bqe = q;
3145   // ToDo: check: ASSERT(CurrentProc==node_loc);
3146   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3147     //next = bqe->link;
3148     /* 
3149        bqe points to the current element in the queue
3150        next points to the next element in the queue
3151     */
3152     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3153     //tso_loc = where_is(tso);
3154     len++;
3155     bqe = unblockOne(bqe, node);
3156   }
3157
3158   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3159      the closure to make room for the anchor of the BQ */
3160   if (bqe!=END_BQ_QUEUE) {
3161     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3162     /*
3163     ASSERT((info_ptr==&RBH_Save_0_info) ||
3164            (info_ptr==&RBH_Save_1_info) ||
3165            (info_ptr==&RBH_Save_2_info));
3166     */
3167     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3168     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3169     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3170
3171     IF_GRAN_DEBUG(bq,
3172                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3173                         node, info_type(node)));
3174   }
3175
3176   /* statistics gathering */
3177   if (RtsFlags.GranFlags.GranSimStats.Global) {
3178     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3179     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3180     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3181     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3182   }
3183   IF_GRAN_DEBUG(bq,
3184                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3185                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3186 }
3187 #elif defined(PARALLEL_HASKELL)
3188 void 
3189 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3190 {
3191   StgBlockingQueueElement *bqe;
3192
3193   IF_PAR_DEBUG(verbose, 
3194                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3195                      node, mytid));
3196 #ifdef DIST  
3197   //RFP
3198   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3199     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3200     return;
3201   }
3202 #endif
3203   
3204   ASSERT(q == END_BQ_QUEUE ||
3205          get_itbl(q)->type == TSO ||           
3206          get_itbl(q)->type == BLOCKED_FETCH || 
3207          get_itbl(q)->type == CONSTR); 
3208
3209   bqe = q;
3210   while (get_itbl(bqe)->type==TSO || 
3211          get_itbl(bqe)->type==BLOCKED_FETCH) {
3212     bqe = unblockOne(bqe, node);
3213   }
3214 }
3215
3216 #else   /* !GRAN && !PARALLEL_HASKELL */
3217
3218 void
3219 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3220 {
3221     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3222                              // Exception.cmm
3223     while (tso != END_TSO_QUEUE) {
3224         tso = unblockOne(cap,tso);
3225     }
3226 }
3227 #endif
3228
3229 /* ---------------------------------------------------------------------------
3230    Interrupt execution
3231    - usually called inside a signal handler so it mustn't do anything fancy.   
3232    ------------------------------------------------------------------------ */
3233
3234 void
3235 interruptStgRts(void)
3236 {
3237     interrupted    = 1;
3238     context_switch = 1;
3239 #if defined(THREADED_RTS)
3240     prodAllCapabilities();
3241 #endif
3242 }
3243
3244 /* -----------------------------------------------------------------------------
3245    Unblock a thread
3246
3247    This is for use when we raise an exception in another thread, which
3248    may be blocked.
3249    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3250    -------------------------------------------------------------------------- */
3251
3252 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3253 /*
3254   NB: only the type of the blocking queue is different in GranSim and GUM
3255       the operations on the queue-elements are the same
3256       long live polymorphism!
3257
3258   Locks: sched_mutex is held upon entry and exit.
3259
3260 */
3261 static void
3262 unblockThread(Capability *cap, StgTSO *tso)
3263 {
3264   StgBlockingQueueElement *t, **last;
3265
3266   switch (tso->why_blocked) {
3267
3268   case NotBlocked:
3269     return;  /* not blocked */
3270
3271   case BlockedOnSTM:
3272     // Be careful: nothing to do here!  We tell the scheduler that the thread
3273     // is runnable and we leave it to the stack-walking code to abort the 
3274     // transaction while unwinding the stack.  We should perhaps have a debugging
3275     // test to make sure that this really happens and that the 'zombie' transaction
3276     // does not get committed.
3277     goto done;
3278
3279   case BlockedOnMVar:
3280     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3281     {
3282       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3283       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3284
3285       last = (StgBlockingQueueElement **)&mvar->head;
3286       for (t = (StgBlockingQueueElement *)mvar->head; 
3287            t != END_BQ_QUEUE; 
3288            last = &t->link, last_tso = t, t = t->link) {
3289         if (t == (StgBlockingQueueElement *)tso) {
3290           *last = (StgBlockingQueueElement *)tso->link;
3291           if (mvar->tail == tso) {
3292             mvar->tail = (StgTSO *)last_tso;
3293           }
3294           goto done;
3295         }
3296       }
3297       barf("unblockThread (MVAR): TSO not found");
3298     }
3299
3300   case BlockedOnBlackHole:
3301     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3302     {
3303       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3304
3305       last = &bq->blocking_queue;
3306       for (t = bq->blocking_queue; 
3307            t != END_BQ_QUEUE; 
3308            last = &t->link, t = t->link) {
3309         if (t == (StgBlockingQueueElement *)tso) {
3310           *last = (StgBlockingQueueElement *)tso->link;
3311           goto done;
3312         }
3313       }
3314       barf("unblockThread (BLACKHOLE): TSO not found");
3315     }
3316
3317   case BlockedOnException:
3318     {
3319       StgTSO *target  = tso->block_info.tso;
3320
3321       ASSERT(get_itbl(target)->type == TSO);
3322
3323       if (target->what_next == ThreadRelocated) {
3324           target = target->link;
3325           ASSERT(get_itbl(target)->type == TSO);
3326       }
3327
3328       ASSERT(target->blocked_exceptions != NULL);
3329
3330       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3331       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3332            t != END_BQ_QUEUE; 
3333            last = &t->link, t = t->link) {
3334         ASSERT(get_itbl(t)->type == TSO);
3335         if (t == (StgBlockingQueueElement *)tso) {
3336           *last = (StgBlockingQueueElement *)tso->link;
3337           goto done;
3338         }
3339       }
3340       barf("unblockThread (Exception): TSO not found");
3341     }
3342
3343   case BlockedOnRead:
3344   case BlockedOnWrite:
3345 #if defined(mingw32_HOST_OS)
3346   case BlockedOnDoProc:
3347 #endif
3348     {
3349       /* take TSO off blocked_queue */
3350       StgBlockingQueueElement *prev = NULL;
3351       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3352            prev = t, t = t->link) {
3353         if (t == (StgBlockingQueueElement *)tso) {
3354           if (prev == NULL) {
3355             blocked_queue_hd = (StgTSO *)t->link;
3356             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3357               blocked_queue_tl = END_TSO_QUEUE;
3358             }
3359           } else {
3360             prev->link = t->link;
3361             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3362               blocked_queue_tl = (StgTSO *)prev;
3363             }
3364           }
3365 #if defined(mingw32_HOST_OS)
3366           /* (Cooperatively) signal that the worker thread should abort
3367            * the request.
3368            */
3369           abandonWorkRequest(tso->block_info.async_result->reqID);
3370 #endif
3371           goto done;
3372         }
3373       }
3374       barf("unblockThread (I/O): TSO not found");
3375     }
3376
3377   case BlockedOnDelay:
3378     {
3379       /* take TSO off sleeping_queue */
3380       StgBlockingQueueElement *prev = NULL;
3381       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3382            prev = t, t = t->link) {
3383         if (t == (StgBlockingQueueElement *)tso) {
3384           if (prev == NULL) {
3385             sleeping_queue = (StgTSO *)t->link;
3386           } else {
3387             prev->link = t->link;
3388           }
3389           goto done;
3390         }
3391       }
3392       barf("unblockThread (delay): TSO not found");
3393     }
3394
3395   default:
3396     barf("unblockThread");
3397   }
3398
3399  done:
3400   tso->link = END_TSO_QUEUE;
3401   tso->why_blocked = NotBlocked;
3402   tso->block_info.closure = NULL;
3403   pushOnRunQueue(cap,tso);
3404 }
3405 #else
3406 static void
3407 unblockThread(Capability *cap, StgTSO *tso)
3408 {
3409   StgTSO *t, **last;
3410   
3411   /* To avoid locking unnecessarily. */
3412   if (tso->why_blocked == NotBlocked) {
3413     return;
3414   }
3415
3416   switch (tso->why_blocked) {
3417
3418   case BlockedOnSTM:
3419     // Be careful: nothing to do here!  We tell the scheduler that the thread
3420     // is runnable and we leave it to the stack-walking code to abort the 
3421     // transaction while unwinding the stack.  We should perhaps have a debugging
3422     // test to make sure that this really happens and that the 'zombie' transaction
3423     // does not get committed.
3424     goto done;
3425
3426   case BlockedOnMVar:
3427     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3428     {
3429       StgTSO *last_tso = END_TSO_QUEUE;
3430       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3431
3432       last = &mvar->head;
3433       for (t = mvar->head; t != END_TSO_QUEUE; 
3434            last = &t->link, last_tso = t, t = t->link) {
3435         if (t == tso) {
3436           *last = tso->link;
3437           if (mvar->tail == tso) {
3438             mvar->tail = last_tso;
3439           }
3440           goto done;
3441         }
3442       }
3443       barf("unblockThread (MVAR): TSO not found");
3444     }
3445
3446   case BlockedOnBlackHole:
3447     {
3448       last = &blackhole_queue;
3449       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3450            last = &t->link, t = t->link) {
3451         if (t == tso) {
3452           *last = tso->link;
3453           goto done;
3454         }
3455       }
3456       barf("unblockThread (BLACKHOLE): TSO not found");
3457     }
3458
3459   case BlockedOnException:
3460     {
3461       StgTSO *target  = tso->block_info.tso;
3462
3463       ASSERT(get_itbl(target)->type == TSO);
3464
3465       while (target->what_next == ThreadRelocated) {
3466           target = target->link;
3467           ASSERT(get_itbl(target)->type == TSO);
3468       }
3469       
3470       ASSERT(target->blocked_exceptions != NULL);
3471
3472       last = &target->blocked_exceptions;
3473       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3474            last = &t->link, t = t->link) {
3475         ASSERT(get_itbl(t)->type == TSO);
3476         if (t == tso) {
3477           *last = tso->link;
3478           goto done;
3479         }
3480       }
3481       barf("unblockThread (Exception): TSO not found");
3482     }
3483
3484 #if !defined(THREADED_RTS)
3485   case BlockedOnRead:
3486   case BlockedOnWrite:
3487 #if defined(mingw32_HOST_OS)
3488   case BlockedOnDoProc:
3489 #endif
3490     {
3491       StgTSO *prev = NULL;
3492       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3493            prev = t, t = t->link) {
3494         if (t == tso) {
3495           if (prev == NULL) {
3496             blocked_queue_hd = t->link;
3497             if (blocked_queue_tl == t) {
3498               blocked_queue_tl = END_TSO_QUEUE;
3499             }
3500           } else {
3501             prev->link = t->link;
3502             if (blocked_queue_tl == t) {
3503               blocked_queue_tl = prev;
3504             }
3505           }
3506 #if defined(mingw32_HOST_OS)
3507           /* (Cooperatively) signal that the worker thread should abort
3508            * the request.
3509            */
3510           abandonWorkRequest(tso->block_info.async_result->reqID);
3511 #endif
3512           goto done;
3513         }
3514       }
3515       barf("unblockThread (I/O): TSO not found");
3516     }
3517
3518   case BlockedOnDelay:
3519     {
3520       StgTSO *prev = NULL;
3521       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3522            prev = t, t = t->link) {
3523         if (t == tso) {
3524           if (prev == NULL) {
3525             sleeping_queue = t->link;
3526           } else {
3527             prev->link = t->link;
3528           }
3529           goto done;
3530         }
3531       }
3532       barf("unblockThread (delay): TSO not found");
3533     }
3534 #endif
3535
3536   default:
3537     barf("unblockThread");
3538   }
3539
3540  done:
3541   tso->link = END_TSO_QUEUE;
3542   tso->why_blocked = NotBlocked;
3543   tso->block_info.closure = NULL;
3544   appendToRunQueue(cap,tso);
3545 }
3546 #endif
3547
3548 /* -----------------------------------------------------------------------------
3549  * checkBlackHoles()
3550  *
3551  * Check the blackhole_queue for threads that can be woken up.  We do
3552  * this periodically: before every GC, and whenever the run queue is
3553  * empty.
3554  *
3555  * An elegant solution might be to just wake up all the blocked
3556  * threads with awakenBlockedQueue occasionally: they'll go back to
3557  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3558  * doesn't give us a way to tell whether we've actually managed to
3559  * wake up any threads, so we would be busy-waiting.
3560  *
3561  * -------------------------------------------------------------------------- */
3562
3563 static rtsBool
3564 checkBlackHoles (Capability *cap)
3565 {
3566     StgTSO **prev, *t;
3567     rtsBool any_woke_up = rtsFalse;
3568     StgHalfWord type;
3569
3570     // blackhole_queue is global:
3571     ASSERT_LOCK_HELD(&sched_mutex);
3572
3573     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3574
3575     // ASSUMES: sched_mutex
3576     prev = &blackhole_queue;
3577     t = blackhole_queue;
3578     while (t != END_TSO_QUEUE) {
3579         ASSERT(t->why_blocked == BlockedOnBlackHole);
3580         type = get_itbl(t->block_info.closure)->type;
3581         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3582             IF_DEBUG(sanity,checkTSO(t));
3583             t = unblockOne(cap, t);
3584             // urk, the threads migrate to the current capability
3585             // here, but we'd like to keep them on the original one.
3586             *prev = t;
3587             any_woke_up = rtsTrue;
3588         } else {
3589             prev = &t->link;
3590             t = t->link;
3591         }
3592     }
3593
3594     return any_woke_up;
3595 }
3596
3597 /* -----------------------------------------------------------------------------
3598  * raiseAsync()
3599  *
3600  * The following function implements the magic for raising an
3601  * asynchronous exception in an existing thread.
3602  *
3603  * We first remove the thread from any queue on which it might be
3604  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3605  *
3606  * We strip the stack down to the innermost CATCH_FRAME, building
3607  * thunks in the heap for all the active computations, so they can 
3608  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3609  * an application of the handler to the exception, and push it on
3610  * the top of the stack.
3611  * 
3612  * How exactly do we save all the active computations?  We create an
3613  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3614  * AP_STACKs pushes everything from the corresponding update frame
3615  * upwards onto the stack.  (Actually, it pushes everything up to the
3616  * next update frame plus a pointer to the next AP_STACK object.
3617  * Entering the next AP_STACK object pushes more onto the stack until we
3618  * reach the last AP_STACK object - at which point the stack should look
3619  * exactly as it did when we killed the TSO and we can continue
3620  * execution by entering the closure on top of the stack.
3621  *
3622  * We can also kill a thread entirely - this happens if either (a) the 
3623  * exception passed to raiseAsync is NULL, or (b) there's no
3624  * CATCH_FRAME on the stack.  In either case, we strip the entire
3625  * stack and replace the thread with a zombie.
3626  *
3627  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3628  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3629  * the TSO is currently blocked on or on the run queue of.
3630  *
3631  * -------------------------------------------------------------------------- */
3632  
3633 void
3634 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3635 {
3636     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3637 }
3638
3639 void
3640 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3641 {
3642     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3643 }
3644
3645 static void
3646 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3647             rtsBool stop_at_atomically, StgPtr stop_here)
3648 {
3649     StgRetInfoTable *info;
3650     StgPtr sp, frame;
3651     nat i;
3652   
3653     // Thread already dead?
3654     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3655         return;
3656     }
3657
3658     IF_DEBUG(scheduler, 
3659              sched_belch("raising exception in thread %ld.", (long)tso->id));
3660     
3661     // Remove it from any blocking queues
3662     unblockThread(cap,tso);
3663
3664     sp = tso->sp;
3665     
3666     // The stack freezing code assumes there's a closure pointer on
3667     // the top of the stack, so we have to arrange that this is the case...
3668     //
3669     if (sp[0] == (W_)&stg_enter_info) {
3670         sp++;
3671     } else {
3672         sp--;
3673         sp[0] = (W_)&stg_dummy_ret_closure;
3674     }
3675
3676     frame = sp + 1;
3677     while (stop_here == NULL || frame < stop_here) {
3678
3679         // 1. Let the top of the stack be the "current closure"
3680         //
3681         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3682         // CATCH_FRAME.
3683         //
3684         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3685         // current closure applied to the chunk of stack up to (but not
3686         // including) the update frame.  This closure becomes the "current
3687         // closure".  Go back to step 2.
3688         //
3689         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3690         // top of the stack applied to the exception.
3691         // 
3692         // 5. If it's a STOP_FRAME, then kill the thread.
3693         // 
3694         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3695         // transaction
3696        
3697         info = get_ret_itbl((StgClosure *)frame);
3698
3699         switch (info->i.type) {
3700
3701         case UPDATE_FRAME:
3702         {
3703             StgAP_STACK * ap;
3704             nat words;
3705             
3706             // First build an AP_STACK consisting of the stack chunk above the
3707             // current update frame, with the top word on the stack as the
3708             // fun field.
3709             //
3710             words = frame - sp - 1;
3711             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3712             
3713             ap->size = words;
3714             ap->fun  = (StgClosure *)sp[0];
3715             sp++;
3716             for(i=0; i < (nat)words; ++i) {
3717                 ap->payload[i] = (StgClosure *)*sp++;
3718             }
3719             
3720             SET_HDR(ap,&stg_AP_STACK_info,
3721                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3722             TICK_ALLOC_UP_THK(words+1,0);
3723             
3724             IF_DEBUG(scheduler,
3725                      debugBelch("sched: Updating ");
3726                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3727                      debugBelch(" with ");
3728                      printObj((StgClosure *)ap);
3729                 );
3730
3731             // Replace the updatee with an indirection
3732             //
3733             // Warning: if we're in a loop, more than one update frame on
3734             // the stack may point to the same object.  Be careful not to
3735             // overwrite an IND_OLDGEN in this case, because we'll screw
3736             // up the mutable lists.  To be on the safe side, don't
3737             // overwrite any kind of indirection at all.  See also
3738             // threadSqueezeStack in GC.c, where we have to make a similar
3739             // check.
3740             //
3741             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3742                 // revert the black hole
3743                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3744                                (StgClosure *)ap);
3745             }
3746             sp += sizeofW(StgUpdateFrame) - 1;
3747             sp[0] = (W_)ap; // push onto stack
3748             frame = sp + 1;
3749             continue; //no need to bump frame
3750         }
3751
3752         case STOP_FRAME:
3753             // We've stripped the entire stack, the thread is now dead.
3754             tso->what_next = ThreadKilled;
3755             tso->sp = frame + sizeofW(StgStopFrame);
3756             return;
3757
3758         case CATCH_FRAME:
3759             // If we find a CATCH_FRAME, and we've got an exception to raise,
3760             // then build the THUNK raise(exception), and leave it on
3761             // top of the CATCH_FRAME ready to enter.
3762             //
3763         {
3764 #ifdef PROFILING
3765             StgCatchFrame *cf = (StgCatchFrame *)frame;
3766 #endif
3767             StgThunk *raise;
3768             
3769             if (exception == NULL) break;
3770
3771             // we've got an exception to raise, so let's pass it to the
3772             // handler in this frame.
3773             //
3774             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3775             TICK_ALLOC_SE_THK(1,0);
3776             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3777             raise->payload[0] = exception;
3778             
3779             // throw away the stack from Sp up to the CATCH_FRAME.
3780             //
3781             sp = frame - 1;
3782             
3783             /* Ensure that async excpetions are blocked now, so we don't get
3784              * a surprise exception before we get around to executing the
3785              * handler.
3786              */
3787             if (tso->blocked_exceptions == NULL) {
3788                 tso->blocked_exceptions = END_TSO_QUEUE;
3789             }
3790
3791             /* Put the newly-built THUNK on top of the stack, ready to execute
3792              * when the thread restarts.
3793              */
3794             sp[0] = (W_)raise;
3795             sp[-1] = (W_)&stg_enter_info;
3796             tso->sp = sp-1;
3797             tso->what_next = ThreadRunGHC;
3798             IF_DEBUG(sanity, checkTSO(tso));
3799             return;
3800         }
3801             
3802         case ATOMICALLY_FRAME:
3803             if (stop_at_atomically) {
3804                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3805                 stmCondemnTransaction(cap, tso -> trec);
3806 #ifdef REG_R1
3807                 tso->sp = frame;
3808 #else
3809                 // R1 is not a register: the return convention for IO in
3810                 // this case puts the return value on the stack, so we
3811                 // need to set up the stack to return to the atomically
3812                 // frame properly...
3813                 tso->sp = frame - 2;
3814                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3815                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3816 #endif
3817                 tso->what_next = ThreadRunGHC;
3818                 return;
3819             }
3820             // Not stop_at_atomically... fall through and abort the
3821             // transaction.
3822             
3823         case CATCH_RETRY_FRAME:
3824             // IF we find an ATOMICALLY_FRAME then we abort the
3825             // current transaction and propagate the exception.  In
3826             // this case (unlike ordinary exceptions) we do not care
3827             // whether the transaction is valid or not because its
3828             // possible validity cannot have caused the exception
3829             // and will not be visible after the abort.
3830             IF_DEBUG(stm,
3831                      debugBelch("Found atomically block delivering async exception\n"));
3832             StgTRecHeader *trec = tso -> trec;
3833             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3834             stmAbortTransaction(cap, trec);
3835             tso -> trec = outer;
3836             break;
3837             
3838         default:
3839             break;
3840         }
3841
3842         // move on to the next stack frame
3843         frame += stack_frame_sizeW((StgClosure *)frame);
3844     }
3845
3846     // if we got here, then we stopped at stop_here
3847     ASSERT(stop_here != NULL);
3848 }
3849
3850 /* -----------------------------------------------------------------------------
3851    Deleting threads
3852
3853    This is used for interruption (^C) and forking, and corresponds to
3854    raising an exception but without letting the thread catch the
3855    exception.
3856    -------------------------------------------------------------------------- */
3857
3858 static void 
3859 deleteThread (Capability *cap, StgTSO *tso)
3860 {
3861   if (tso->why_blocked != BlockedOnCCall &&
3862       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3863       raiseAsync(cap,tso,NULL);
3864   }
3865 }
3866
3867 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3868 static void 
3869 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3870 { // for forkProcess only:
3871   // delete thread without giving it a chance to catch the KillThread exception
3872
3873   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3874       return;
3875   }
3876
3877   if (tso->why_blocked != BlockedOnCCall &&
3878       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3879       unblockThread(cap,tso);
3880   }
3881
3882   tso->what_next = ThreadKilled;
3883 }
3884 #endif
3885
3886 /* -----------------------------------------------------------------------------
3887    raiseExceptionHelper
3888    
3889    This function is called by the raise# primitve, just so that we can
3890    move some of the tricky bits of raising an exception from C-- into
3891    C.  Who knows, it might be a useful re-useable thing here too.
3892    -------------------------------------------------------------------------- */
3893
3894 StgWord
3895 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3896 {
3897     Capability *cap = regTableToCapability(reg);
3898     StgThunk *raise_closure = NULL;
3899     StgPtr p, next;
3900     StgRetInfoTable *info;
3901     //
3902     // This closure represents the expression 'raise# E' where E
3903     // is the exception raise.  It is used to overwrite all the
3904     // thunks which are currently under evaluataion.
3905     //
3906
3907     //    
3908     // LDV profiling: stg_raise_info has THUNK as its closure
3909     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3910     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3911     // 1 does not cause any problem unless profiling is performed.
3912     // However, when LDV profiling goes on, we need to linearly scan
3913     // small object pool, where raise_closure is stored, so we should
3914     // use MIN_UPD_SIZE.
3915     //
3916     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3917     //                                 sizeofW(StgClosure)+1);
3918     //
3919
3920     //
3921     // Walk up the stack, looking for the catch frame.  On the way,
3922     // we update any closures pointed to from update frames with the
3923     // raise closure that we just built.
3924     //
3925     p = tso->sp;
3926     while(1) {
3927         info = get_ret_itbl((StgClosure *)p);
3928         next = p + stack_frame_sizeW((StgClosure *)p);
3929         switch (info->i.type) {
3930             
3931         case UPDATE_FRAME:
3932             // Only create raise_closure if we need to.
3933             if (raise_closure == NULL) {
3934                 raise_closure = 
3935                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3936                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3937                 raise_closure->payload[0] = exception;
3938             }
3939             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3940             p = next;
3941             continue;
3942
3943         case ATOMICALLY_FRAME:
3944             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3945             tso->sp = p;
3946             return ATOMICALLY_FRAME;
3947             
3948         case CATCH_FRAME:
3949             tso->sp = p;
3950             return CATCH_FRAME;
3951
3952         case CATCH_STM_FRAME:
3953             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3954             tso->sp = p;
3955             return CATCH_STM_FRAME;
3956             
3957         case STOP_FRAME:
3958             tso->sp = p;
3959             return STOP_FRAME;
3960
3961         case CATCH_RETRY_FRAME:
3962         default:
3963             p = next; 
3964             continue;
3965         }
3966     }
3967 }
3968
3969
3970 /* -----------------------------------------------------------------------------
3971    findRetryFrameHelper
3972
3973    This function is called by the retry# primitive.  It traverses the stack
3974    leaving tso->sp referring to the frame which should handle the retry.  
3975
3976    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3977    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3978
3979    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3980    despite the similar implementation.
3981
3982    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3983    not be created within memory transactions.
3984    -------------------------------------------------------------------------- */
3985
3986 StgWord
3987 findRetryFrameHelper (StgTSO *tso)
3988 {
3989   StgPtr           p, next;
3990   StgRetInfoTable *info;
3991
3992   p = tso -> sp;
3993   while (1) {
3994     info = get_ret_itbl((StgClosure *)p);
3995     next = p + stack_frame_sizeW((StgClosure *)p);
3996     switch (info->i.type) {
3997       
3998     case ATOMICALLY_FRAME:
3999       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4000       tso->sp = p;
4001       return ATOMICALLY_FRAME;
4002       
4003     case CATCH_RETRY_FRAME:
4004       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4005       tso->sp = p;
4006       return CATCH_RETRY_FRAME;
4007       
4008     case CATCH_STM_FRAME:
4009     default:
4010       ASSERT(info->i.type != CATCH_FRAME);
4011       ASSERT(info->i.type != STOP_FRAME);
4012       p = next; 
4013       continue;
4014     }
4015   }
4016 }
4017
4018 /* -----------------------------------------------------------------------------
4019    resurrectThreads is called after garbage collection on the list of
4020    threads found to be garbage.  Each of these threads will be woken
4021    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4022    on an MVar, or NonTermination if the thread was blocked on a Black
4023    Hole.
4024
4025    Locks: assumes we hold *all* the capabilities.
4026    -------------------------------------------------------------------------- */
4027
4028 void
4029 resurrectThreads (StgTSO *threads)
4030 {
4031     StgTSO *tso, *next;
4032     Capability *cap;
4033
4034     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4035         next = tso->global_link;
4036         tso->global_link = all_threads;
4037         all_threads = tso;
4038         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4039         
4040         // Wake up the thread on the Capability it was last on for a
4041         // bound thread, or last_free_capability otherwise.
4042         if (tso->bound) {
4043             cap = tso->bound->cap;
4044         } else {
4045             cap = last_free_capability;
4046         }
4047         
4048         switch (tso->why_blocked) {
4049         case BlockedOnMVar:
4050         case BlockedOnException:
4051             /* Called by GC - sched_mutex lock is currently held. */
4052             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4053             break;
4054         case BlockedOnBlackHole:
4055             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4056             break;
4057         case BlockedOnSTM:
4058             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4059             break;
4060         case NotBlocked:
4061             /* This might happen if the thread was blocked on a black hole
4062              * belonging to a thread that we've just woken up (raiseAsync
4063              * can wake up threads, remember...).
4064              */
4065             continue;
4066         default:
4067             barf("resurrectThreads: thread blocked in a strange way");
4068         }
4069     }
4070 }
4071
4072 /* ----------------------------------------------------------------------------
4073  * Debugging: why is a thread blocked
4074  * [Also provides useful information when debugging threaded programs
4075  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4076    ------------------------------------------------------------------------- */
4077
4078 #if DEBUG
4079 static void
4080 printThreadBlockage(StgTSO *tso)
4081 {
4082   switch (tso->why_blocked) {
4083   case BlockedOnRead:
4084     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4085     break;
4086   case BlockedOnWrite:
4087     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4088     break;
4089 #if defined(mingw32_HOST_OS)
4090     case BlockedOnDoProc:
4091     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4092     break;
4093 #endif
4094   case BlockedOnDelay:
4095     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4096     break;
4097   case BlockedOnMVar:
4098     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4099     break;
4100   case BlockedOnException:
4101     debugBelch("is blocked on delivering an exception to thread %d",
4102             tso->block_info.tso->id);
4103     break;
4104   case BlockedOnBlackHole:
4105     debugBelch("is blocked on a black hole");
4106     break;
4107   case NotBlocked:
4108     debugBelch("is not blocked");
4109     break;
4110 #if defined(PARALLEL_HASKELL)
4111   case BlockedOnGA:
4112     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4113             tso->block_info.closure, info_type(tso->block_info.closure));
4114     break;
4115   case BlockedOnGA_NoSend:
4116     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4117             tso->block_info.closure, info_type(tso->block_info.closure));
4118     break;
4119 #endif
4120   case BlockedOnCCall:
4121     debugBelch("is blocked on an external call");
4122     break;
4123   case BlockedOnCCall_NoUnblockExc:
4124     debugBelch("is blocked on an external call (exceptions were already blocked)");
4125     break;
4126   case BlockedOnSTM:
4127     debugBelch("is blocked on an STM operation");
4128     break;
4129   default:
4130     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4131          tso->why_blocked, tso->id, tso);
4132   }
4133 }
4134
4135 void
4136 printThreadStatus(StgTSO *t)
4137 {
4138     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4139     {
4140       void *label = lookupThreadLabel(t->id);
4141       if (label) debugBelch("[\"%s\"] ",(char *)label);
4142     }
4143     if (t->what_next == ThreadRelocated) {
4144         debugBelch("has been relocated...\n");
4145     } else {
4146         switch (t->what_next) {
4147         case ThreadKilled:
4148             debugBelch("has been killed");
4149             break;
4150         case ThreadComplete:
4151             debugBelch("has completed");
4152             break;
4153         default:
4154             printThreadBlockage(t);
4155         }
4156         debugBelch("\n");
4157     }
4158 }
4159
4160 void
4161 printAllThreads(void)
4162 {
4163   StgTSO *t, *next;
4164   nat i;
4165   Capability *cap;
4166
4167 # if defined(GRAN)
4168   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4169   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4170                        time_string, rtsFalse/*no commas!*/);
4171
4172   debugBelch("all threads at [%s]:\n", time_string);
4173 # elif defined(PARALLEL_HASKELL)
4174   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4175   ullong_format_string(CURRENT_TIME,
4176                        time_string, rtsFalse/*no commas!*/);
4177
4178   debugBelch("all threads at [%s]:\n", time_string);
4179 # else
4180   debugBelch("all threads:\n");
4181 # endif
4182
4183   for (i = 0; i < n_capabilities; i++) {
4184       cap = &capabilities[i];
4185       debugBelch("threads on capability %d:\n", cap->no);
4186       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4187           printThreadStatus(t);
4188       }
4189   }
4190
4191   debugBelch("other threads:\n");
4192   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4193       if (t->why_blocked != NotBlocked) {
4194           printThreadStatus(t);
4195       }
4196       if (t->what_next == ThreadRelocated) {
4197           next = t->link;
4198       } else {
4199           next = t->global_link;
4200       }
4201   }
4202 }
4203
4204 // useful from gdb
4205 void 
4206 printThreadQueue(StgTSO *t)
4207 {
4208     nat i = 0;
4209     for (; t != END_TSO_QUEUE; t = t->link) {
4210         printThreadStatus(t);
4211         i++;
4212     }
4213     debugBelch("%d threads on queue\n", i);
4214 }
4215
4216 /* 
4217    Print a whole blocking queue attached to node (debugging only).
4218 */
4219 # if defined(PARALLEL_HASKELL)
4220 void 
4221 print_bq (StgClosure *node)
4222 {
4223   StgBlockingQueueElement *bqe;
4224   StgTSO *tso;
4225   rtsBool end;
4226
4227   debugBelch("## BQ of closure %p (%s): ",
4228           node, info_type(node));
4229
4230   /* should cover all closures that may have a blocking queue */
4231   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4232          get_itbl(node)->type == FETCH_ME_BQ ||
4233          get_itbl(node)->type == RBH ||
4234          get_itbl(node)->type == MVAR);
4235     
4236   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4237
4238   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4239 }
4240
4241 /* 
4242    Print a whole blocking queue starting with the element bqe.
4243 */
4244 void 
4245 print_bqe (StgBlockingQueueElement *bqe)
4246 {
4247   rtsBool end;
4248
4249   /* 
4250      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4251   */
4252   for (end = (bqe==END_BQ_QUEUE);
4253        !end; // iterate until bqe points to a CONSTR
4254        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4255        bqe = end ? END_BQ_QUEUE : bqe->link) {
4256     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4257     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4258     /* types of closures that may appear in a blocking queue */
4259     ASSERT(get_itbl(bqe)->type == TSO ||           
4260            get_itbl(bqe)->type == BLOCKED_FETCH || 
4261            get_itbl(bqe)->type == CONSTR); 
4262     /* only BQs of an RBH end with an RBH_Save closure */
4263     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4264
4265     switch (get_itbl(bqe)->type) {
4266     case TSO:
4267       debugBelch(" TSO %u (%x),",
4268               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4269       break;
4270     case BLOCKED_FETCH:
4271       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4272               ((StgBlockedFetch *)bqe)->node, 
4273               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4274               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4275               ((StgBlockedFetch *)bqe)->ga.weight);
4276       break;
4277     case CONSTR:
4278       debugBelch(" %s (IP %p),",
4279               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4280                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4281                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4282                "RBH_Save_?"), get_itbl(bqe));
4283       break;
4284     default:
4285       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4286            info_type((StgClosure *)bqe)); // , node, info_type(node));
4287       break;
4288     }
4289   } /* for */
4290   debugBelch("\n");
4291 }
4292 # elif defined(GRAN)
4293 void 
4294 print_bq (StgClosure *node)
4295 {
4296   StgBlockingQueueElement *bqe;
4297   PEs node_loc, tso_loc;
4298   rtsBool end;
4299
4300   /* should cover all closures that may have a blocking queue */
4301   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4302          get_itbl(node)->type == FETCH_ME_BQ ||
4303          get_itbl(node)->type == RBH);
4304     
4305   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4306   node_loc = where_is(node);
4307
4308   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4309           node, info_type(node), node_loc);
4310
4311   /* 
4312      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4313   */
4314   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4315        !end; // iterate until bqe points to a CONSTR
4316        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4317     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4318     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4319     /* types of closures that may appear in a blocking queue */
4320     ASSERT(get_itbl(bqe)->type == TSO ||           
4321            get_itbl(bqe)->type == CONSTR); 
4322     /* only BQs of an RBH end with an RBH_Save closure */
4323     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4324
4325     tso_loc = where_is((StgClosure *)bqe);
4326     switch (get_itbl(bqe)->type) {
4327     case TSO:
4328       debugBelch(" TSO %d (%p) on [PE %d],",
4329               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4330       break;
4331     case CONSTR:
4332       debugBelch(" %s (IP %p),",
4333               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4334                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4335                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4336                "RBH_Save_?"), get_itbl(bqe));
4337       break;
4338     default:
4339       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4340            info_type((StgClosure *)bqe), node, info_type(node));
4341       break;
4342     }
4343   } /* for */
4344   debugBelch("\n");
4345 }
4346 # endif
4347
4348 #if defined(PARALLEL_HASKELL)
4349 static nat
4350 run_queue_len(void)
4351 {
4352     nat i;
4353     StgTSO *tso;
4354     
4355     for (i=0, tso=run_queue_hd; 
4356          tso != END_TSO_QUEUE;
4357          i++, tso=tso->link) {
4358         /* nothing */
4359     }
4360         
4361     return i;
4362 }
4363 #endif
4364
4365 void
4366 sched_belch(char *s, ...)
4367 {
4368     va_list ap;
4369     va_start(ap,s);
4370 #ifdef THREADED_RTS
4371     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4372 #elif defined(PARALLEL_HASKELL)
4373     debugBelch("== ");
4374 #else
4375     debugBelch("sched: ");
4376 #endif
4377     vdebugBelch(s, ap);
4378     debugBelch("\n");
4379     va_end(ap);
4380 }
4381
4382 #endif /* DEBUG */