Merge the smp and threaded RTS ways
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
231                          void (*get_roots)(evac_fn));
232
233 static void unblockThread(Capability *cap, StgTSO *tso);
234 static rtsBool checkBlackHoles(Capability *cap);
235 static void AllRoots(evac_fn evac);
236
237 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238
239 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
240                         rtsBool stop_at_atomically, StgPtr stop_here);
241
242 static void deleteThread (Capability *cap, StgTSO *tso);
243 static void deleteRunQueue (Capability *cap);
244
245 #ifdef DEBUG
246 static void printThreadBlockage(StgTSO *tso);
247 static void printThreadStatus(StgTSO *tso);
248 void printThreadQueue(StgTSO *tso);
249 #endif
250
251 #if defined(PARALLEL_HASKELL)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 #ifdef DEBUG
257 static char *whatNext_strs[] = {
258   "(unknown)",
259   "ThreadRunGHC",
260   "ThreadInterpret",
261   "ThreadKilled",
262   "ThreadRelocated",
263   "ThreadComplete"
264 };
265 #endif
266
267 /* -----------------------------------------------------------------------------
268  * Putting a thread on the run queue: different scheduling policies
269  * -------------------------------------------------------------------------- */
270
271 STATIC_INLINE void
272 addToRunQueue( Capability *cap, StgTSO *t )
273 {
274 #if defined(PARALLEL_HASKELL)
275     if (RtsFlags.ParFlags.doFairScheduling) { 
276         // this does round-robin scheduling; good for concurrency
277         appendToRunQueue(cap,t);
278     } else {
279         // this does unfair scheduling; good for parallelism
280         pushOnRunQueue(cap,t);
281     }
282 #else
283     // this does round-robin scheduling; good for concurrency
284     appendToRunQueue(cap,t);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    GRAN version:
301      In a GranSim setup this loop iterates over the global event queue.
302      This revolves around the global event queue, which determines what 
303      to do next. Therefore, it's more complicated than either the 
304      concurrent or the parallel (GUM) setup.
305
306    GUM version:
307      GUM iterates over incoming messages.
308      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
309      and sends out a fish whenever it has nothing to do; in-between
310      doing the actual reductions (shared code below) it processes the
311      incoming messages and deals with delayed operations 
312      (see PendingFetches).
313      This is not the ugliest code you could imagine, but it's bloody close.
314
315    ------------------------------------------------------------------------ */
316
317 static Capability *
318 schedule (Capability *initialCapability, Task *task)
319 {
320   StgTSO *t;
321   Capability *cap;
322   StgThreadReturnCode ret;
323 #if defined(GRAN)
324   rtsEvent *event;
325 #elif defined(PARALLEL_HASKELL)
326   StgTSO *tso;
327   GlobalTaskId pe;
328   rtsBool receivedFinish = rtsFalse;
329 # if defined(DEBUG)
330   nat tp_size, sp_size; // stats only
331 # endif
332 #endif
333   nat prev_what_next;
334   rtsBool ready_to_gc;
335 #if defined(THREADED_RTS)
336   rtsBool first = rtsTrue;
337 #endif
338   
339   cap = initialCapability;
340
341   // Pre-condition: this task owns initialCapability.
342   // The sched_mutex is *NOT* held
343   // NB. on return, we still hold a capability.
344
345   IF_DEBUG(scheduler,
346            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
347                        task, initialCapability);
348       );
349
350   schedulePreLoop();
351
352   // -----------------------------------------------------------
353   // Scheduler loop starts here:
354
355 #if defined(PARALLEL_HASKELL)
356 #define TERMINATION_CONDITION        (!receivedFinish)
357 #elif defined(GRAN)
358 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
359 #else
360 #define TERMINATION_CONDITION        rtsTrue
361 #endif
362
363   while (TERMINATION_CONDITION) {
364
365 #if defined(GRAN)
366       /* Choose the processor with the next event */
367       CurrentProc = event->proc;
368       CurrentTSO = event->tso;
369 #endif
370
371 #if defined(THREADED_RTS)
372       if (first) {
373           // don't yield the first time, we want a chance to run this
374           // thread for a bit, even if there are others banging at the
375           // door.
376           first = rtsFalse;
377           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378       } else {
379           // Yield the capability to higher-priority tasks if necessary.
380           yieldCapability(&cap, task);
381       }
382 #endif
383       
384 #if defined(THREADED_RTS)
385       schedulePushWork(cap,task);
386 #endif
387
388     // Check whether we have re-entered the RTS from Haskell without
389     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390     // call).
391     if (cap->in_haskell) {
392           errorBelch("schedule: re-entered unsafely.\n"
393                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
394           stg_exit(EXIT_FAILURE);
395     }
396
397     //
398     // Test for interruption.  If interrupted==rtsTrue, then either
399     // we received a keyboard interrupt (^C), or the scheduler is
400     // trying to shut down all the tasks (shutting_down_scheduler) in
401     // the threaded RTS.
402     //
403     if (interrupted) {
404         deleteRunQueue(cap);
405 #if defined(THREADED_RTS)
406         discardSparksCap(cap);
407 #endif
408         if (shutting_down_scheduler) {
409             IF_DEBUG(scheduler, sched_belch("shutting down"));
410             // If we are a worker, just exit.  If we're a bound thread
411             // then we will exit below when we've removed our TSO from
412             // the run queue.
413             if (task->tso == NULL && emptyRunQueue(cap)) {
414                 return cap;
415             }
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(THREADED_RTS)
422     // If the run queue is empty, take a spark and turn it into a thread.
423     {
424         if (emptyRunQueue(cap)) {
425             StgClosure *spark;
426             spark = findSpark(cap);
427             if (spark != NULL) {
428                 IF_DEBUG(scheduler,
429                          sched_belch("turning spark of closure %p into a thread",
430                                      (StgClosure *)spark));
431                 createSparkThread(cap,spark);     
432             }
433         }
434     }
435 #endif // THREADED_RTS
436
437     scheduleStartSignalHandlers(cap);
438
439     // Only check the black holes here if we've nothing else to do.
440     // During normal execution, the black hole list only gets checked
441     // at GC time, to avoid repeatedly traversing this possibly long
442     // list each time around the scheduler.
443     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444
445     scheduleCheckBlockedThreads(cap);
446
447     scheduleDetectDeadlock(cap,task);
448
449     // Normally, the only way we can get here with no threads to
450     // run is if a keyboard interrupt received during 
451     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
452     // Additionally, it is not fatal for the
453     // threaded RTS to reach here with no threads to run.
454     //
455     // win32: might be here due to awaitEvent() being abandoned
456     // as a result of a console event having been delivered.
457     if ( emptyRunQueue(cap) ) {
458 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
459         ASSERT(interrupted);
460 #endif
461         continue; // nothing to do
462     }
463
464 #if defined(PARALLEL_HASKELL)
465     scheduleSendPendingMessages();
466     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
467         continue;
468
469 #if defined(SPARKS)
470     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
471 #endif
472
473     /* If we still have no work we need to send a FISH to get a spark
474        from another PE */
475     if (emptyRunQueue(cap)) {
476         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
477         ASSERT(rtsFalse); // should not happen at the moment
478     }
479     // from here: non-empty run queue.
480     //  TODO: merge above case with this, only one call processMessages() !
481     if (PacketsWaiting()) {  /* process incoming messages, if
482                                 any pending...  only in else
483                                 because getRemoteWork waits for
484                                 messages as well */
485         receivedFinish = processMessages();
486     }
487 #endif
488
489 #if defined(GRAN)
490     scheduleProcessEvent(event);
491 #endif
492
493     // 
494     // Get a thread to run
495     //
496     t = popRunQueue(cap);
497
498 #if defined(GRAN) || defined(PAR)
499     scheduleGranParReport(); // some kind of debuging output
500 #else
501     // Sanity check the thread we're about to run.  This can be
502     // expensive if there is lots of thread switching going on...
503     IF_DEBUG(sanity,checkTSO(t));
504 #endif
505
506 #if defined(THREADED_RTS)
507     // Check whether we can run this thread in the current task.
508     // If not, we have to pass our capability to the right task.
509     {
510         Task *bound = t->bound;
511       
512         if (bound) {
513             if (bound == task) {
514                 IF_DEBUG(scheduler,
515                          sched_belch("### Running thread %d in bound thread",
516                                      t->id));
517                 // yes, the Haskell thread is bound to the current native thread
518             } else {
519                 IF_DEBUG(scheduler,
520                          sched_belch("### thread %d bound to another OS thread",
521                                      t->id));
522                 // no, bound to a different Haskell thread: pass to that thread
523                 pushOnRunQueue(cap,t);
524                 continue;
525             }
526         } else {
527             // The thread we want to run is unbound.
528             if (task->tso) { 
529                 IF_DEBUG(scheduler,
530                          sched_belch("### this OS thread cannot run thread %d", t->id));
531                 // no, the current native thread is bound to a different
532                 // Haskell thread, so pass it to any worker thread
533                 pushOnRunQueue(cap,t);
534                 continue; 
535             }
536         }
537     }
538 #endif
539
540     cap->r.rCurrentTSO = t;
541     
542     /* context switches are initiated by the timer signal, unless
543      * the user specified "context switch as often as possible", with
544      * +RTS -C0
545      */
546     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
547         && !emptyThreadQueues(cap)) {
548         context_switch = 1;
549     }
550          
551 run_thread:
552
553     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
554                               (long)t->id, whatNext_strs[t->what_next]));
555
556 #if defined(PROFILING)
557     startHeapProfTimer();
558 #endif
559
560     // ----------------------------------------------------------------------
561     // Run the current thread 
562
563     prev_what_next = t->what_next;
564
565     errno = t->saved_errno;
566     cap->in_haskell = rtsTrue;
567
568     dirtyTSO(t);
569
570     recent_activity = ACTIVITY_YES;
571
572     switch (prev_what_next) {
573         
574     case ThreadKilled:
575     case ThreadComplete:
576         /* Thread already finished, return to scheduler. */
577         ret = ThreadFinished;
578         break;
579         
580     case ThreadRunGHC:
581     {
582         StgRegTable *r;
583         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
584         cap = regTableToCapability(r);
585         ret = r->rRet;
586         break;
587     }
588     
589     case ThreadInterpret:
590         cap = interpretBCO(cap);
591         ret = cap->r.rRet;
592         break;
593         
594     default:
595         barf("schedule: invalid what_next field");
596     }
597
598     cap->in_haskell = rtsFalse;
599
600     // The TSO might have moved, eg. if it re-entered the RTS and a GC
601     // happened.  So find the new location:
602     t = cap->r.rCurrentTSO;
603
604     // We have run some Haskell code: there might be blackhole-blocked
605     // threads to wake up now.
606     // Lock-free test here should be ok, we're just setting a flag.
607     if ( blackhole_queue != END_TSO_QUEUE ) {
608         blackholes_need_checking = rtsTrue;
609     }
610     
611     // And save the current errno in this thread.
612     // XXX: possibly bogus for SMP because this thread might already
613     // be running again, see code below.
614     t->saved_errno = errno;
615
616 #ifdef SMP
617     // If ret is ThreadBlocked, and this Task is bound to the TSO that
618     // blocked, we are in limbo - the TSO is now owned by whatever it
619     // is blocked on, and may in fact already have been woken up,
620     // perhaps even on a different Capability.  It may be the case
621     // that task->cap != cap.  We better yield this Capability
622     // immediately and return to normaility.
623     if (ret == ThreadBlocked) {
624         IF_DEBUG(scheduler,
625                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
626                              t->id, whatNext_strs[t->what_next]));
627         continue;
628     }
629 #endif
630
631     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
632
633     // ----------------------------------------------------------------------
634     
635     // Costs for the scheduler are assigned to CCS_SYSTEM
636 #if defined(PROFILING)
637     stopHeapProfTimer();
638     CCCS = CCS_SYSTEM;
639 #endif
640     
641 #if defined(THREADED_RTS)
642     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
643 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
644     IF_DEBUG(scheduler,debugBelch("sched: "););
645 #endif
646     
647     schedulePostRunThread();
648
649     ready_to_gc = rtsFalse;
650
651     switch (ret) {
652     case HeapOverflow:
653         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
654         break;
655
656     case StackOverflow:
657         scheduleHandleStackOverflow(cap,task,t);
658         break;
659
660     case ThreadYielding:
661         if (scheduleHandleYield(cap, t, prev_what_next)) {
662             // shortcut for switching between compiler/interpreter:
663             goto run_thread; 
664         }
665         break;
666
667     case ThreadBlocked:
668         scheduleHandleThreadBlocked(t);
669         break;
670
671     case ThreadFinished:
672         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
673         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674         break;
675
676     default:
677       barf("schedule: invalid thread return code %d", (int)ret);
678     }
679
680     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
681     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse,GetRoots); }
682   } /* end of while() */
683
684   IF_PAR_DEBUG(verbose,
685                debugBelch("== Leaving schedule() after having received Finish\n"));
686 }
687
688 /* ----------------------------------------------------------------------------
689  * Setting up the scheduler loop
690  * ------------------------------------------------------------------------- */
691
692 static void
693 schedulePreLoop(void)
694 {
695 #if defined(GRAN) 
696     /* set up first event to get things going */
697     /* ToDo: assign costs for system setup and init MainTSO ! */
698     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
699               ContinueThread, 
700               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
701     
702     IF_DEBUG(gran,
703              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
704                         CurrentTSO);
705              G_TSO(CurrentTSO, 5));
706     
707     if (RtsFlags.GranFlags.Light) {
708         /* Save current time; GranSim Light only */
709         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
710     }      
711 #endif
712 }
713
714 /* -----------------------------------------------------------------------------
715  * schedulePushWork()
716  *
717  * Push work to other Capabilities if we have some.
718  * -------------------------------------------------------------------------- */
719
720 #if defined(THREADED_RTS)
721 static void
722 schedulePushWork(Capability *cap USED_IF_THREADS, 
723                  Task *task      USED_IF_THREADS)
724 {
725     Capability *free_caps[n_capabilities], *cap0;
726     nat i, n_free_caps;
727
728     // Check whether we have more threads on our run queue, or sparks
729     // in our pool, that we could hand to another Capability.
730     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
731         && sparkPoolSizeCap(cap) < 2) {
732         return;
733     }
734
735     // First grab as many free Capabilities as we can.
736     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
737         cap0 = &capabilities[i];
738         if (cap != cap0 && tryGrabCapability(cap0,task)) {
739             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
740                 // it already has some work, we just grabbed it at 
741                 // the wrong moment.  Or maybe it's deadlocked!
742                 releaseCapability(cap0);
743             } else {
744                 free_caps[n_free_caps++] = cap0;
745             }
746         }
747     }
748
749     // we now have n_free_caps free capabilities stashed in
750     // free_caps[].  Share our run queue equally with them.  This is
751     // probably the simplest thing we could do; improvements we might
752     // want to do include:
753     //
754     //   - giving high priority to moving relatively new threads, on 
755     //     the gournds that they haven't had time to build up a
756     //     working set in the cache on this CPU/Capability.
757     //
758     //   - giving low priority to moving long-lived threads
759
760     if (n_free_caps > 0) {
761         StgTSO *prev, *t, *next;
762         rtsBool pushed_to_all;
763
764         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
765
766         i = 0;
767         pushed_to_all = rtsFalse;
768
769         if (cap->run_queue_hd != END_TSO_QUEUE) {
770             prev = cap->run_queue_hd;
771             t = prev->link;
772             prev->link = END_TSO_QUEUE;
773             for (; t != END_TSO_QUEUE; t = next) {
774                 next = t->link;
775                 t->link = END_TSO_QUEUE;
776                 if (t->what_next == ThreadRelocated
777                     || t->bound == task) { // don't move my bound thread
778                     prev->link = t;
779                     prev = t;
780                 } else if (i == n_free_caps) {
781                     pushed_to_all = rtsTrue;
782                     i = 0;
783                     // keep one for us
784                     prev->link = t;
785                     prev = t;
786                 } else {
787                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
788                     appendToRunQueue(free_caps[i],t);
789                     if (t->bound) { t->bound->cap = free_caps[i]; }
790                     i++;
791                 }
792             }
793             cap->run_queue_tl = prev;
794         }
795
796         // If there are some free capabilities that we didn't push any
797         // threads to, then try to push a spark to each one.
798         if (!pushed_to_all) {
799             StgClosure *spark;
800             // i is the next free capability to push to
801             for (; i < n_free_caps; i++) {
802                 if (emptySparkPoolCap(free_caps[i])) {
803                     spark = findSpark(cap);
804                     if (spark != NULL) {
805                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
806                         newSpark(&(free_caps[i]->r), spark);
807                     }
808                 }
809             }
810         }
811
812         // release the capabilities
813         for (i = 0; i < n_free_caps; i++) {
814             task->cap = free_caps[i];
815             releaseCapability(free_caps[i]);
816         }
817     }
818     task->cap = cap; // reset to point to our Capability.
819 }
820 #endif
821
822 /* ----------------------------------------------------------------------------
823  * Start any pending signal handlers
824  * ------------------------------------------------------------------------- */
825
826 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
827 static void
828 scheduleStartSignalHandlers(Capability *cap)
829 {
830     if (signals_pending()) { // safe outside the lock
831         startSignalHandlers(cap);
832     }
833 }
834 #else
835 static void
836 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
837 {
838 }
839 #endif
840
841 /* ----------------------------------------------------------------------------
842  * Check for blocked threads that can be woken up.
843  * ------------------------------------------------------------------------- */
844
845 static void
846 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
847 {
848 #if !defined(THREADED_RTS)
849     //
850     // Check whether any waiting threads need to be woken up.  If the
851     // run queue is empty, and there are no other tasks running, we
852     // can wait indefinitely for something to happen.
853     //
854     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
855     {
856         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
857     }
858 #endif
859 }
860
861
862 /* ----------------------------------------------------------------------------
863  * Check for threads blocked on BLACKHOLEs that can be woken up
864  * ------------------------------------------------------------------------- */
865 static void
866 scheduleCheckBlackHoles (Capability *cap)
867 {
868     if ( blackholes_need_checking ) // check without the lock first
869     {
870         ACQUIRE_LOCK(&sched_mutex);
871         if ( blackholes_need_checking ) {
872             checkBlackHoles(cap);
873             blackholes_need_checking = rtsFalse;
874         }
875         RELEASE_LOCK(&sched_mutex);
876     }
877 }
878
879 /* ----------------------------------------------------------------------------
880  * Detect deadlock conditions and attempt to resolve them.
881  * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleDetectDeadlock (Capability *cap, Task *task)
885 {
886
887 #if defined(PARALLEL_HASKELL)
888     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
889     return;
890 #endif
891
892     /* 
893      * Detect deadlock: when we have no threads to run, there are no
894      * threads blocked, waiting for I/O, or sleeping, and all the
895      * other tasks are waiting for work, we must have a deadlock of
896      * some description.
897      */
898     if ( emptyThreadQueues(cap) )
899     {
900 #if defined(THREADED_RTS)
901         /* 
902          * In the threaded RTS, we only check for deadlock if there
903          * has been no activity in a complete timeslice.  This means
904          * we won't eagerly start a full GC just because we don't have
905          * any threads to run currently.
906          */
907         if (recent_activity != ACTIVITY_INACTIVE) return;
908 #endif
909
910         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
911
912         // Garbage collection can release some new threads due to
913         // either (a) finalizers or (b) threads resurrected because
914         // they are unreachable and will therefore be sent an
915         // exception.  Any threads thus released will be immediately
916         // runnable.
917         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
918         recent_activity = ACTIVITY_DONE_GC;
919         
920         if ( !emptyRunQueue(cap) ) return;
921
922 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
923         /* If we have user-installed signal handlers, then wait
924          * for signals to arrive rather then bombing out with a
925          * deadlock.
926          */
927         if ( anyUserHandlers() ) {
928             IF_DEBUG(scheduler, 
929                      sched_belch("still deadlocked, waiting for signals..."));
930
931             awaitUserSignals();
932
933             if (signals_pending()) {
934                 startSignalHandlers(cap);
935             }
936
937             // either we have threads to run, or we were interrupted:
938             ASSERT(!emptyRunQueue(cap) || interrupted);
939         }
940 #endif
941
942 #if !defined(THREADED_RTS)
943         /* Probably a real deadlock.  Send the current main thread the
944          * Deadlock exception.
945          */
946         if (task->tso) {
947             switch (task->tso->why_blocked) {
948             case BlockedOnSTM:
949             case BlockedOnBlackHole:
950             case BlockedOnException:
951             case BlockedOnMVar:
952                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
953                 return;
954             default:
955                 barf("deadlock: main thread blocked in a strange way");
956             }
957         }
958         return;
959 #endif
960     }
961 }
962
963 /* ----------------------------------------------------------------------------
964  * Process an event (GRAN only)
965  * ------------------------------------------------------------------------- */
966
967 #if defined(GRAN)
968 static StgTSO *
969 scheduleProcessEvent(rtsEvent *event)
970 {
971     StgTSO *t;
972
973     if (RtsFlags.GranFlags.Light)
974       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
975
976     /* adjust time based on time-stamp */
977     if (event->time > CurrentTime[CurrentProc] &&
978         event->evttype != ContinueThread)
979       CurrentTime[CurrentProc] = event->time;
980     
981     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
982     if (!RtsFlags.GranFlags.Light)
983       handleIdlePEs();
984
985     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
986
987     /* main event dispatcher in GranSim */
988     switch (event->evttype) {
989       /* Should just be continuing execution */
990     case ContinueThread:
991       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
992       /* ToDo: check assertion
993       ASSERT(run_queue_hd != (StgTSO*)NULL &&
994              run_queue_hd != END_TSO_QUEUE);
995       */
996       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
997       if (!RtsFlags.GranFlags.DoAsyncFetch &&
998           procStatus[CurrentProc]==Fetching) {
999         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1000               CurrentTSO->id, CurrentTSO, CurrentProc);
1001         goto next_thread;
1002       } 
1003       /* Ignore ContinueThreads for completed threads */
1004       if (CurrentTSO->what_next == ThreadComplete) {
1005         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1006               CurrentTSO->id, CurrentTSO, CurrentProc);
1007         goto next_thread;
1008       } 
1009       /* Ignore ContinueThreads for threads that are being migrated */
1010       if (PROCS(CurrentTSO)==Nowhere) { 
1011         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1012               CurrentTSO->id, CurrentTSO, CurrentProc);
1013         goto next_thread;
1014       }
1015       /* The thread should be at the beginning of the run queue */
1016       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1017         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1018               CurrentTSO->id, CurrentTSO, CurrentProc);
1019         break; // run the thread anyway
1020       }
1021       /*
1022       new_event(proc, proc, CurrentTime[proc],
1023                 FindWork,
1024                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1025       goto next_thread; 
1026       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1027       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1028
1029     case FetchNode:
1030       do_the_fetchnode(event);
1031       goto next_thread;             /* handle next event in event queue  */
1032       
1033     case GlobalBlock:
1034       do_the_globalblock(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case FetchReply:
1038       do_the_fetchreply(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     case UnblockThread:   /* Move from the blocked queue to the tail of */
1042       do_the_unblock(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case ResumeThread:  /* Move from the blocked queue to the tail of */
1046       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1047       event->tso->gran.blocktime += 
1048         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1049       do_the_startthread(event);
1050       goto next_thread;             /* handle next event in event queue  */
1051       
1052     case StartThread:
1053       do_the_startthread(event);
1054       goto next_thread;             /* handle next event in event queue  */
1055       
1056     case MoveThread:
1057       do_the_movethread(event);
1058       goto next_thread;             /* handle next event in event queue  */
1059       
1060     case MoveSpark:
1061       do_the_movespark(event);
1062       goto next_thread;             /* handle next event in event queue  */
1063       
1064     case FindWork:
1065       do_the_findwork(event);
1066       goto next_thread;             /* handle next event in event queue  */
1067       
1068     default:
1069       barf("Illegal event type %u\n", event->evttype);
1070     }  /* switch */
1071     
1072     /* This point was scheduler_loop in the old RTS */
1073
1074     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1075
1076     TimeOfLastEvent = CurrentTime[CurrentProc];
1077     TimeOfNextEvent = get_time_of_next_event();
1078     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1079     // CurrentTSO = ThreadQueueHd;
1080
1081     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1082                          TimeOfNextEvent));
1083
1084     if (RtsFlags.GranFlags.Light) 
1085       GranSimLight_leave_system(event, &ActiveTSO); 
1086
1087     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1088
1089     IF_DEBUG(gran, 
1090              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1091
1092     /* in a GranSim setup the TSO stays on the run queue */
1093     t = CurrentTSO;
1094     /* Take a thread from the run queue. */
1095     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1096
1097     IF_DEBUG(gran, 
1098              debugBelch("GRAN: About to run current thread, which is\n");
1099              G_TSO(t,5));
1100
1101     context_switch = 0; // turned on via GranYield, checking events and time slice
1102
1103     IF_DEBUG(gran, 
1104              DumpGranEvent(GR_SCHEDULE, t));
1105
1106     procStatus[CurrentProc] = Busy;
1107 }
1108 #endif // GRAN
1109
1110 /* ----------------------------------------------------------------------------
1111  * Send pending messages (PARALLEL_HASKELL only)
1112  * ------------------------------------------------------------------------- */
1113
1114 #if defined(PARALLEL_HASKELL)
1115 static StgTSO *
1116 scheduleSendPendingMessages(void)
1117 {
1118     StgSparkPool *pool;
1119     rtsSpark spark;
1120     StgTSO *t;
1121
1122 # if defined(PAR) // global Mem.Mgmt., omit for now
1123     if (PendingFetches != END_BF_QUEUE) {
1124         processFetches();
1125     }
1126 # endif
1127     
1128     if (RtsFlags.ParFlags.BufferTime) {
1129         // if we use message buffering, we must send away all message
1130         // packets which have become too old...
1131         sendOldBuffers(); 
1132     }
1133 }
1134 #endif
1135
1136 /* ----------------------------------------------------------------------------
1137  * Activate spark threads (PARALLEL_HASKELL only)
1138  * ------------------------------------------------------------------------- */
1139
1140 #if defined(PARALLEL_HASKELL)
1141 static void
1142 scheduleActivateSpark(void)
1143 {
1144 #if defined(SPARKS)
1145   ASSERT(emptyRunQueue());
1146 /* We get here if the run queue is empty and want some work.
1147    We try to turn a spark into a thread, and add it to the run queue,
1148    from where it will be picked up in the next iteration of the scheduler
1149    loop.
1150 */
1151
1152       /* :-[  no local threads => look out for local sparks */
1153       /* the spark pool for the current PE */
1154       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1155       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1156           pool->hd < pool->tl) {
1157         /* 
1158          * ToDo: add GC code check that we really have enough heap afterwards!!
1159          * Old comment:
1160          * If we're here (no runnable threads) and we have pending
1161          * sparks, we must have a space problem.  Get enough space
1162          * to turn one of those pending sparks into a
1163          * thread... 
1164          */
1165
1166         spark = findSpark(rtsFalse);            /* get a spark */
1167         if (spark != (rtsSpark) NULL) {
1168           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1169           IF_PAR_DEBUG(fish, // schedule,
1170                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1171                              tso->id, tso, advisory_thread_count));
1172
1173           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1174             IF_PAR_DEBUG(fish, // schedule,
1175                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1176                             spark));
1177             return rtsFalse; /* failed to generate a thread */
1178           }                  /* otherwise fall through & pick-up new tso */
1179         } else {
1180           IF_PAR_DEBUG(fish, // schedule,
1181                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1182                              spark_queue_len(pool)));
1183           return rtsFalse;  /* failed to generate a thread */
1184         }
1185         return rtsTrue;  /* success in generating a thread */
1186   } else { /* no more threads permitted or pool empty */
1187     return rtsFalse;  /* failed to generateThread */
1188   }
1189 #else
1190   tso = NULL; // avoid compiler warning only
1191   return rtsFalse;  /* dummy in non-PAR setup */
1192 #endif // SPARKS
1193 }
1194 #endif // PARALLEL_HASKELL
1195
1196 /* ----------------------------------------------------------------------------
1197  * Get work from a remote node (PARALLEL_HASKELL only)
1198  * ------------------------------------------------------------------------- */
1199     
1200 #if defined(PARALLEL_HASKELL)
1201 static rtsBool
1202 scheduleGetRemoteWork(rtsBool *receivedFinish)
1203 {
1204   ASSERT(emptyRunQueue());
1205
1206   if (RtsFlags.ParFlags.BufferTime) {
1207         IF_PAR_DEBUG(verbose, 
1208                 debugBelch("...send all pending data,"));
1209         {
1210           nat i;
1211           for (i=1; i<=nPEs; i++)
1212             sendImmediately(i); // send all messages away immediately
1213         }
1214   }
1215 # ifndef SPARKS
1216         //++EDEN++ idle() , i.e. send all buffers, wait for work
1217         // suppress fishing in EDEN... just look for incoming messages
1218         // (blocking receive)
1219   IF_PAR_DEBUG(verbose, 
1220                debugBelch("...wait for incoming messages...\n"));
1221   *receivedFinish = processMessages(); // blocking receive...
1222
1223         // and reenter scheduling loop after having received something
1224         // (return rtsFalse below)
1225
1226 # else /* activate SPARKS machinery */
1227 /* We get here, if we have no work, tried to activate a local spark, but still
1228    have no work. We try to get a remote spark, by sending a FISH message.
1229    Thread migration should be added here, and triggered when a sequence of 
1230    fishes returns without work. */
1231         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1232
1233       /* =8-[  no local sparks => look for work on other PEs */
1234         /*
1235          * We really have absolutely no work.  Send out a fish
1236          * (there may be some out there already), and wait for
1237          * something to arrive.  We clearly can't run any threads
1238          * until a SCHEDULE or RESUME arrives, and so that's what
1239          * we're hoping to see.  (Of course, we still have to
1240          * respond to other types of messages.)
1241          */
1242         rtsTime now = msTime() /*CURRENT_TIME*/;
1243         IF_PAR_DEBUG(verbose, 
1244                      debugBelch("--  now=%ld\n", now));
1245         IF_PAR_DEBUG(fish, // verbose,
1246              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1247                  (last_fish_arrived_at!=0 &&
1248                   last_fish_arrived_at+delay > now)) {
1249                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1250                      now, last_fish_arrived_at+delay, 
1251                      last_fish_arrived_at,
1252                      delay);
1253              });
1254   
1255         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1256             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1257           if (last_fish_arrived_at==0 ||
1258               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1259             /* outstandingFishes is set in sendFish, processFish;
1260                avoid flooding system with fishes via delay */
1261     next_fish_to_send_at = 0;  
1262   } else {
1263     /* ToDo: this should be done in the main scheduling loop to avoid the
1264              busy wait here; not so bad if fish delay is very small  */
1265     int iq = 0; // DEBUGGING -- HWL
1266     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1267     /* send a fish when ready, but process messages that arrive in the meantime */
1268     do {
1269       if (PacketsWaiting()) {
1270         iq++; // DEBUGGING
1271         *receivedFinish = processMessages();
1272       }
1273       now = msTime();
1274     } while (!*receivedFinish || now<next_fish_to_send_at);
1275     // JB: This means the fish could become obsolete, if we receive
1276     // work. Better check for work again? 
1277     // last line: while (!receivedFinish || !haveWork || now<...)
1278     // next line: if (receivedFinish || haveWork )
1279
1280     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1281       return rtsFalse;  // NB: this will leave scheduler loop
1282                         // immediately after return!
1283                           
1284     IF_PAR_DEBUG(fish, // verbose,
1285                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1286
1287   }
1288
1289     // JB: IMHO, this should all be hidden inside sendFish(...)
1290     /* pe = choosePE(); 
1291        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1292                 NEW_FISH_HUNGER);
1293
1294     // Global statistics: count no. of fishes
1295     if (RtsFlags.ParFlags.ParStats.Global &&
1296          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1297            globalParStats.tot_fish_mess++;
1298            }
1299     */ 
1300
1301   /* delayed fishes must have been sent by now! */
1302   next_fish_to_send_at = 0;  
1303   }
1304       
1305   *receivedFinish = processMessages();
1306 # endif /* SPARKS */
1307
1308  return rtsFalse;
1309  /* NB: this function always returns rtsFalse, meaning the scheduler
1310     loop continues with the next iteration; 
1311     rationale: 
1312       return code means success in finding work; we enter this function
1313       if there is no local work, thus have to send a fish which takes
1314       time until it arrives with work; in the meantime we should process
1315       messages in the main loop;
1316  */
1317 }
1318 #endif // PARALLEL_HASKELL
1319
1320 /* ----------------------------------------------------------------------------
1321  * PAR/GRAN: Report stats & debugging info(?)
1322  * ------------------------------------------------------------------------- */
1323
1324 #if defined(PAR) || defined(GRAN)
1325 static void
1326 scheduleGranParReport(void)
1327 {
1328   ASSERT(run_queue_hd != END_TSO_QUEUE);
1329
1330   /* Take a thread from the run queue, if we have work */
1331   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1332
1333     /* If this TSO has got its outport closed in the meantime, 
1334      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1335      * It has to be marked as TH_DEAD for this purpose.
1336      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1337
1338 JB: TODO: investigate wether state change field could be nuked
1339      entirely and replaced by the normal tso state (whatnext
1340      field). All we want to do is to kill tsos from outside.
1341      */
1342
1343     /* ToDo: write something to the log-file
1344     if (RTSflags.ParFlags.granSimStats && !sameThread)
1345         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1346
1347     CurrentTSO = t;
1348     */
1349     /* the spark pool for the current PE */
1350     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1351
1352     IF_DEBUG(scheduler, 
1353              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1354                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1355
1356     IF_PAR_DEBUG(fish,
1357              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1358                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1359
1360     if (RtsFlags.ParFlags.ParStats.Full && 
1361         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1362         (emitSchedule || // forced emit
1363          (t && LastTSO && t->id != LastTSO->id))) {
1364       /* 
1365          we are running a different TSO, so write a schedule event to log file
1366          NB: If we use fair scheduling we also have to write  a deschedule 
1367              event for LastTSO; with unfair scheduling we know that the
1368              previous tso has blocked whenever we switch to another tso, so
1369              we don't need it in GUM for now
1370       */
1371       IF_PAR_DEBUG(fish, // schedule,
1372                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1373
1374       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1375                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1376       emitSchedule = rtsFalse;
1377     }
1378 }     
1379 #endif
1380
1381 /* ----------------------------------------------------------------------------
1382  * After running a thread...
1383  * ------------------------------------------------------------------------- */
1384
1385 static void
1386 schedulePostRunThread(void)
1387 {
1388 #if defined(PAR)
1389     /* HACK 675: if the last thread didn't yield, make sure to print a 
1390        SCHEDULE event to the log file when StgRunning the next thread, even
1391        if it is the same one as before */
1392     LastTSO = t; 
1393     TimeOfLastYield = CURRENT_TIME;
1394 #endif
1395
1396   /* some statistics gathering in the parallel case */
1397
1398 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1399   switch (ret) {
1400     case HeapOverflow:
1401 # if defined(GRAN)
1402       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1403       globalGranStats.tot_heapover++;
1404 # elif defined(PAR)
1405       globalParStats.tot_heapover++;
1406 # endif
1407       break;
1408
1409      case StackOverflow:
1410 # if defined(GRAN)
1411       IF_DEBUG(gran, 
1412                DumpGranEvent(GR_DESCHEDULE, t));
1413       globalGranStats.tot_stackover++;
1414 # elif defined(PAR)
1415       // IF_DEBUG(par, 
1416       // DumpGranEvent(GR_DESCHEDULE, t);
1417       globalParStats.tot_stackover++;
1418 # endif
1419       break;
1420
1421     case ThreadYielding:
1422 # if defined(GRAN)
1423       IF_DEBUG(gran, 
1424                DumpGranEvent(GR_DESCHEDULE, t));
1425       globalGranStats.tot_yields++;
1426 # elif defined(PAR)
1427       // IF_DEBUG(par, 
1428       // DumpGranEvent(GR_DESCHEDULE, t);
1429       globalParStats.tot_yields++;
1430 # endif
1431       break; 
1432
1433     case ThreadBlocked:
1434 # if defined(GRAN)
1435       IF_DEBUG(scheduler,
1436                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1437                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1438                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1439                if (t->block_info.closure!=(StgClosure*)NULL)
1440                  print_bq(t->block_info.closure);
1441                debugBelch("\n"));
1442
1443       // ??? needed; should emit block before
1444       IF_DEBUG(gran, 
1445                DumpGranEvent(GR_DESCHEDULE, t)); 
1446       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1447       /*
1448         ngoq Dogh!
1449       ASSERT(procStatus[CurrentProc]==Busy || 
1450               ((procStatus[CurrentProc]==Fetching) && 
1451               (t->block_info.closure!=(StgClosure*)NULL)));
1452       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1453           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1454             procStatus[CurrentProc]==Fetching)) 
1455         procStatus[CurrentProc] = Idle;
1456       */
1457 # elif defined(PAR)
1458 //++PAR++  blockThread() writes the event (change?)
1459 # endif
1460     break;
1461
1462   case ThreadFinished:
1463     break;
1464
1465   default:
1466     barf("parGlobalStats: unknown return code");
1467     break;
1468     }
1469 #endif
1470 }
1471
1472 /* -----------------------------------------------------------------------------
1473  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1474  * -------------------------------------------------------------------------- */
1475
1476 static rtsBool
1477 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1478 {
1479     // did the task ask for a large block?
1480     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1481         // if so, get one and push it on the front of the nursery.
1482         bdescr *bd;
1483         lnat blocks;
1484         
1485         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1486         
1487         IF_DEBUG(scheduler,
1488                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1489                             (long)t->id, whatNext_strs[t->what_next], blocks));
1490         
1491         // don't do this if the nursery is (nearly) full, we'll GC first.
1492         if (cap->r.rCurrentNursery->link != NULL ||
1493             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1494                                                // if the nursery has only one block.
1495             
1496             ACQUIRE_SM_LOCK
1497             bd = allocGroup( blocks );
1498             RELEASE_SM_LOCK
1499             cap->r.rNursery->n_blocks += blocks;
1500             
1501             // link the new group into the list
1502             bd->link = cap->r.rCurrentNursery;
1503             bd->u.back = cap->r.rCurrentNursery->u.back;
1504             if (cap->r.rCurrentNursery->u.back != NULL) {
1505                 cap->r.rCurrentNursery->u.back->link = bd;
1506             } else {
1507 #if !defined(THREADED_RTS)
1508                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1509                        g0s0 == cap->r.rNursery);
1510 #endif
1511                 cap->r.rNursery->blocks = bd;
1512             }             
1513             cap->r.rCurrentNursery->u.back = bd;
1514             
1515             // initialise it as a nursery block.  We initialise the
1516             // step, gen_no, and flags field of *every* sub-block in
1517             // this large block, because this is easier than making
1518             // sure that we always find the block head of a large
1519             // block whenever we call Bdescr() (eg. evacuate() and
1520             // isAlive() in the GC would both have to do this, at
1521             // least).
1522             { 
1523                 bdescr *x;
1524                 for (x = bd; x < bd + blocks; x++) {
1525                     x->step = cap->r.rNursery;
1526                     x->gen_no = 0;
1527                     x->flags = 0;
1528                 }
1529             }
1530             
1531             // This assert can be a killer if the app is doing lots
1532             // of large block allocations.
1533             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1534             
1535             // now update the nursery to point to the new block
1536             cap->r.rCurrentNursery = bd;
1537             
1538             // we might be unlucky and have another thread get on the
1539             // run queue before us and steal the large block, but in that
1540             // case the thread will just end up requesting another large
1541             // block.
1542             pushOnRunQueue(cap,t);
1543             return rtsFalse;  /* not actually GC'ing */
1544         }
1545     }
1546     
1547     IF_DEBUG(scheduler,
1548              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1549                         (long)t->id, whatNext_strs[t->what_next]));
1550 #if defined(GRAN)
1551     ASSERT(!is_on_queue(t,CurrentProc));
1552 #elif defined(PARALLEL_HASKELL)
1553     /* Currently we emit a DESCHEDULE event before GC in GUM.
1554        ToDo: either add separate event to distinguish SYSTEM time from rest
1555        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1556     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1557         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1558                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1559         emitSchedule = rtsTrue;
1560     }
1561 #endif
1562       
1563     pushOnRunQueue(cap,t);
1564     return rtsTrue;
1565     /* actual GC is done at the end of the while loop in schedule() */
1566 }
1567
1568 /* -----------------------------------------------------------------------------
1569  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1570  * -------------------------------------------------------------------------- */
1571
1572 static void
1573 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1574 {
1575     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1576                                   (long)t->id, whatNext_strs[t->what_next]));
1577     /* just adjust the stack for this thread, then pop it back
1578      * on the run queue.
1579      */
1580     { 
1581         /* enlarge the stack */
1582         StgTSO *new_t = threadStackOverflow(cap, t);
1583         
1584         /* The TSO attached to this Task may have moved, so update the
1585          * pointer to it.
1586          */
1587         if (task->tso == t) {
1588             task->tso = new_t;
1589         }
1590         pushOnRunQueue(cap,new_t);
1591     }
1592 }
1593
1594 /* -----------------------------------------------------------------------------
1595  * Handle a thread that returned to the scheduler with ThreadYielding
1596  * -------------------------------------------------------------------------- */
1597
1598 static rtsBool
1599 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1600 {
1601     // Reset the context switch flag.  We don't do this just before
1602     // running the thread, because that would mean we would lose ticks
1603     // during GC, which can lead to unfair scheduling (a thread hogs
1604     // the CPU because the tick always arrives during GC).  This way
1605     // penalises threads that do a lot of allocation, but that seems
1606     // better than the alternative.
1607     context_switch = 0;
1608     
1609     /* put the thread back on the run queue.  Then, if we're ready to
1610      * GC, check whether this is the last task to stop.  If so, wake
1611      * up the GC thread.  getThread will block during a GC until the
1612      * GC is finished.
1613      */
1614     IF_DEBUG(scheduler,
1615              if (t->what_next != prev_what_next) {
1616                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1617                             (long)t->id, whatNext_strs[t->what_next]);
1618              } else {
1619                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1620                             (long)t->id, whatNext_strs[t->what_next]);
1621              }
1622         );
1623     
1624     IF_DEBUG(sanity,
1625              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1626              checkTSO(t));
1627     ASSERT(t->link == END_TSO_QUEUE);
1628     
1629     // Shortcut if we're just switching evaluators: don't bother
1630     // doing stack squeezing (which can be expensive), just run the
1631     // thread.
1632     if (t->what_next != prev_what_next) {
1633         return rtsTrue;
1634     }
1635     
1636 #if defined(GRAN)
1637     ASSERT(!is_on_queue(t,CurrentProc));
1638       
1639     IF_DEBUG(sanity,
1640              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1641              checkThreadQsSanity(rtsTrue));
1642
1643 #endif
1644
1645     addToRunQueue(cap,t);
1646
1647 #if defined(GRAN)
1648     /* add a ContinueThread event to actually process the thread */
1649     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1650               ContinueThread,
1651               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1652     IF_GRAN_DEBUG(bq, 
1653                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1654                   G_EVENTQ(0);
1655                   G_CURR_THREADQ(0));
1656 #endif
1657     return rtsFalse;
1658 }
1659
1660 /* -----------------------------------------------------------------------------
1661  * Handle a thread that returned to the scheduler with ThreadBlocked
1662  * -------------------------------------------------------------------------- */
1663
1664 static void
1665 scheduleHandleThreadBlocked( StgTSO *t
1666 #if !defined(GRAN) && !defined(DEBUG)
1667     STG_UNUSED
1668 #endif
1669     )
1670 {
1671 #if defined(GRAN)
1672     IF_DEBUG(scheduler,
1673              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1674                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1675              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1676     
1677     // ??? needed; should emit block before
1678     IF_DEBUG(gran, 
1679              DumpGranEvent(GR_DESCHEDULE, t)); 
1680     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1681     /*
1682       ngoq Dogh!
1683       ASSERT(procStatus[CurrentProc]==Busy || 
1684       ((procStatus[CurrentProc]==Fetching) && 
1685       (t->block_info.closure!=(StgClosure*)NULL)));
1686       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1687       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1688       procStatus[CurrentProc]==Fetching)) 
1689       procStatus[CurrentProc] = Idle;
1690     */
1691 #elif defined(PAR)
1692     IF_DEBUG(scheduler,
1693              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1694                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1695     IF_PAR_DEBUG(bq,
1696                  
1697                  if (t->block_info.closure!=(StgClosure*)NULL) 
1698                  print_bq(t->block_info.closure));
1699     
1700     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1701     blockThread(t);
1702     
1703     /* whatever we schedule next, we must log that schedule */
1704     emitSchedule = rtsTrue;
1705     
1706 #else /* !GRAN */
1707
1708       // We don't need to do anything.  The thread is blocked, and it
1709       // has tidied up its stack and placed itself on whatever queue
1710       // it needs to be on.
1711
1712 #if !defined(THREADED_RTS)
1713     ASSERT(t->why_blocked != NotBlocked);
1714              // This might not be true under THREADED_RTS: we don't have
1715              // exclusive access to this TSO, so someone might have
1716              // woken it up by now.  This actually happens: try
1717              // conc023 +RTS -N2.
1718 #endif
1719
1720     IF_DEBUG(scheduler,
1721              debugBelch("--<< thread %d (%s) stopped: ", 
1722                         t->id, whatNext_strs[t->what_next]);
1723              printThreadBlockage(t);
1724              debugBelch("\n"));
1725     
1726     /* Only for dumping event to log file 
1727        ToDo: do I need this in GranSim, too?
1728        blockThread(t);
1729     */
1730 #endif
1731 }
1732
1733 /* -----------------------------------------------------------------------------
1734  * Handle a thread that returned to the scheduler with ThreadFinished
1735  * -------------------------------------------------------------------------- */
1736
1737 static rtsBool
1738 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1739 {
1740     /* Need to check whether this was a main thread, and if so,
1741      * return with the return value.
1742      *
1743      * We also end up here if the thread kills itself with an
1744      * uncaught exception, see Exception.cmm.
1745      */
1746     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1747                                   t->id, whatNext_strs[t->what_next]));
1748
1749 #if defined(GRAN)
1750       endThread(t, CurrentProc); // clean-up the thread
1751 #elif defined(PARALLEL_HASKELL)
1752       /* For now all are advisory -- HWL */
1753       //if(t->priority==AdvisoryPriority) ??
1754       advisory_thread_count--; // JB: Caution with this counter, buggy!
1755       
1756 # if defined(DIST)
1757       if(t->dist.priority==RevalPriority)
1758         FinishReval(t);
1759 # endif
1760     
1761 # if defined(EDENOLD)
1762       // the thread could still have an outport... (BUG)
1763       if (t->eden.outport != -1) {
1764       // delete the outport for the tso which has finished...
1765         IF_PAR_DEBUG(eden_ports,
1766                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1767                               t->eden.outport, t->id));
1768         deleteOPT(t);
1769       }
1770       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1771       if (t->eden.epid != -1) {
1772         IF_PAR_DEBUG(eden_ports,
1773                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1774                            t->id, t->eden.epid));
1775         removeTSOfromProcess(t);
1776       }
1777 # endif 
1778
1779 # if defined(PAR)
1780       if (RtsFlags.ParFlags.ParStats.Full &&
1781           !RtsFlags.ParFlags.ParStats.Suppressed) 
1782         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1783
1784       //  t->par only contains statistics: left out for now...
1785       IF_PAR_DEBUG(fish,
1786                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1787                               t->id,t,t->par.sparkname));
1788 # endif
1789 #endif // PARALLEL_HASKELL
1790
1791       //
1792       // Check whether the thread that just completed was a bound
1793       // thread, and if so return with the result.  
1794       //
1795       // There is an assumption here that all thread completion goes
1796       // through this point; we need to make sure that if a thread
1797       // ends up in the ThreadKilled state, that it stays on the run
1798       // queue so it can be dealt with here.
1799       //
1800
1801       if (t->bound) {
1802
1803           if (t->bound != task) {
1804 #if !defined(THREADED_RTS)
1805               // Must be a bound thread that is not the topmost one.  Leave
1806               // it on the run queue until the stack has unwound to the
1807               // point where we can deal with this.  Leaving it on the run
1808               // queue also ensures that the garbage collector knows about
1809               // this thread and its return value (it gets dropped from the
1810               // all_threads list so there's no other way to find it).
1811               appendToRunQueue(cap,t);
1812               return rtsFalse;
1813 #else
1814               // this cannot happen in the threaded RTS, because a
1815               // bound thread can only be run by the appropriate Task.
1816               barf("finished bound thread that isn't mine");
1817 #endif
1818           }
1819
1820           ASSERT(task->tso == t);
1821
1822           if (t->what_next == ThreadComplete) {
1823               if (task->ret) {
1824                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1825                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1826               }
1827               task->stat = Success;
1828           } else {
1829               if (task->ret) {
1830                   *(task->ret) = NULL;
1831               }
1832               if (interrupted) {
1833                   task->stat = Interrupted;
1834               } else {
1835                   task->stat = Killed;
1836               }
1837           }
1838 #ifdef DEBUG
1839           removeThreadLabel((StgWord)task->tso->id);
1840 #endif
1841           return rtsTrue; // tells schedule() to return
1842       }
1843
1844       return rtsFalse;
1845 }
1846
1847 /* -----------------------------------------------------------------------------
1848  * Perform a heap census, if PROFILING
1849  * -------------------------------------------------------------------------- */
1850
1851 static rtsBool
1852 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1853 {
1854 #if defined(PROFILING)
1855     // When we have +RTS -i0 and we're heap profiling, do a census at
1856     // every GC.  This lets us get repeatable runs for debugging.
1857     if (performHeapProfile ||
1858         (RtsFlags.ProfFlags.profileInterval==0 &&
1859          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1860         GarbageCollect(GetRoots, rtsTrue);
1861         heapCensus();
1862         performHeapProfile = rtsFalse;
1863         return rtsTrue;  // true <=> we already GC'd
1864     }
1865 #endif
1866     return rtsFalse;
1867 }
1868
1869 /* -----------------------------------------------------------------------------
1870  * Perform a garbage collection if necessary
1871  * -------------------------------------------------------------------------- */
1872
1873 static void
1874 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1875               rtsBool force_major, void (*get_roots)(evac_fn))
1876 {
1877     StgTSO *t;
1878 #ifdef THREADED_RTS
1879     static volatile StgWord waiting_for_gc;
1880     rtsBool was_waiting;
1881     nat i;
1882 #endif
1883
1884 #ifdef THREADED_RTS
1885     // In order to GC, there must be no threads running Haskell code.
1886     // Therefore, the GC thread needs to hold *all* the capabilities,
1887     // and release them after the GC has completed.  
1888     //
1889     // This seems to be the simplest way: previous attempts involved
1890     // making all the threads with capabilities give up their
1891     // capabilities and sleep except for the *last* one, which
1892     // actually did the GC.  But it's quite hard to arrange for all
1893     // the other tasks to sleep and stay asleep.
1894     //
1895         
1896     was_waiting = cas(&waiting_for_gc, 0, 1);
1897     if (was_waiting) {
1898         do {
1899             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1900             if (cap) yieldCapability(&cap,task);
1901         } while (waiting_for_gc);
1902         return;
1903     }
1904
1905     for (i=0; i < n_capabilities; i++) {
1906         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1907         if (cap != &capabilities[i]) {
1908             Capability *pcap = &capabilities[i];
1909             // we better hope this task doesn't get migrated to
1910             // another Capability while we're waiting for this one.
1911             // It won't, because load balancing happens while we have
1912             // all the Capabilities, but even so it's a slightly
1913             // unsavoury invariant.
1914             task->cap = pcap;
1915             context_switch = 1;
1916             waitForReturnCapability(&pcap, task);
1917             if (pcap != &capabilities[i]) {
1918                 barf("scheduleDoGC: got the wrong capability");
1919             }
1920         }
1921     }
1922
1923     waiting_for_gc = rtsFalse;
1924 #endif
1925
1926     /* Kick any transactions which are invalid back to their
1927      * atomically frames.  When next scheduled they will try to
1928      * commit, this commit will fail and they will retry.
1929      */
1930     { 
1931         StgTSO *next;
1932
1933         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1934             if (t->what_next == ThreadRelocated) {
1935                 next = t->link;
1936             } else {
1937                 next = t->global_link;
1938                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1939                     if (!stmValidateNestOfTransactions (t -> trec)) {
1940                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1941                         
1942                         // strip the stack back to the
1943                         // ATOMICALLY_FRAME, aborting the (nested)
1944                         // transaction, and saving the stack of any
1945                         // partially-evaluated thunks on the heap.
1946                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
1947                         
1948 #ifdef REG_R1
1949                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1950 #endif
1951                     }
1952                 }
1953             }
1954         }
1955     }
1956     
1957     // so this happens periodically:
1958     if (cap) scheduleCheckBlackHoles(cap);
1959     
1960     IF_DEBUG(scheduler, printAllThreads());
1961
1962     /* everybody back, start the GC.
1963      * Could do it in this thread, or signal a condition var
1964      * to do it in another thread.  Either way, we need to
1965      * broadcast on gc_pending_cond afterward.
1966      */
1967 #if defined(THREADED_RTS)
1968     IF_DEBUG(scheduler,sched_belch("doing GC"));
1969 #endif
1970     GarbageCollect(get_roots, force_major);
1971     
1972 #if defined(THREADED_RTS)
1973     // release our stash of capabilities.
1974     for (i = 0; i < n_capabilities; i++) {
1975         if (cap != &capabilities[i]) {
1976             task->cap = &capabilities[i];
1977             releaseCapability(&capabilities[i]);
1978         }
1979     }
1980     if (cap) {
1981         task->cap = cap;
1982     } else {
1983         task->cap = NULL;
1984     }
1985 #endif
1986
1987 #if defined(GRAN)
1988     /* add a ContinueThread event to continue execution of current thread */
1989     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1990               ContinueThread,
1991               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1992     IF_GRAN_DEBUG(bq, 
1993                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1994                   G_EVENTQ(0);
1995                   G_CURR_THREADQ(0));
1996 #endif /* GRAN */
1997 }
1998
1999 /* ---------------------------------------------------------------------------
2000  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2001  * used by Control.Concurrent for error checking.
2002  * ------------------------------------------------------------------------- */
2003  
2004 StgBool
2005 rtsSupportsBoundThreads(void)
2006 {
2007 #if defined(THREADED_RTS)
2008   return rtsTrue;
2009 #else
2010   return rtsFalse;
2011 #endif
2012 }
2013
2014 /* ---------------------------------------------------------------------------
2015  * isThreadBound(tso): check whether tso is bound to an OS thread.
2016  * ------------------------------------------------------------------------- */
2017  
2018 StgBool
2019 isThreadBound(StgTSO* tso USED_IF_THREADS)
2020 {
2021 #if defined(THREADED_RTS)
2022   return (tso->bound != NULL);
2023 #endif
2024   return rtsFalse;
2025 }
2026
2027 /* ---------------------------------------------------------------------------
2028  * Singleton fork(). Do not copy any running threads.
2029  * ------------------------------------------------------------------------- */
2030
2031 #if !defined(mingw32_HOST_OS)
2032 #define FORKPROCESS_PRIMOP_SUPPORTED
2033 #endif
2034
2035 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2036 static void 
2037 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2038 #endif
2039 StgInt
2040 forkProcess(HsStablePtr *entry
2041 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2042             STG_UNUSED
2043 #endif
2044            )
2045 {
2046 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2047     Task *task;
2048     pid_t pid;
2049     StgTSO* t,*next;
2050     Capability *cap;
2051     
2052 #if defined(THREADED_RTS)
2053     if (RtsFlags.ParFlags.nNodes > 1) {
2054         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2055         stg_exit(EXIT_FAILURE);
2056     }
2057 #endif
2058
2059     IF_DEBUG(scheduler,sched_belch("forking!"));
2060     
2061     // ToDo: for SMP, we should probably acquire *all* the capabilities
2062     cap = rts_lock();
2063     
2064     pid = fork();
2065     
2066     if (pid) { // parent
2067         
2068         // just return the pid
2069         rts_unlock(cap);
2070         return pid;
2071         
2072     } else { // child
2073         
2074         // delete all threads
2075         cap->run_queue_hd = END_TSO_QUEUE;
2076         cap->run_queue_tl = END_TSO_QUEUE;
2077         
2078         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2079             next = t->link;
2080             
2081             // don't allow threads to catch the ThreadKilled exception
2082             deleteThreadImmediately(cap,t);
2083         }
2084         
2085         // wipe the task list
2086         ACQUIRE_LOCK(&sched_mutex);
2087         for (task = all_tasks; task != NULL; task=task->all_link) {
2088             if (task != cap->running_task) discardTask(task);
2089         }
2090         RELEASE_LOCK(&sched_mutex);
2091
2092         cap->suspended_ccalling_tasks = NULL;
2093
2094 #if defined(THREADED_RTS)
2095         // wipe our spare workers list.
2096         cap->spare_workers = NULL;
2097         cap->returning_tasks_hd = NULL;
2098         cap->returning_tasks_tl = NULL;
2099 #endif
2100
2101         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2102         rts_checkSchedStatus("forkProcess",cap);
2103         
2104         rts_unlock(cap);
2105         hs_exit();                      // clean up and exit
2106         stg_exit(EXIT_SUCCESS);
2107     }
2108 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2109     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2110     return -1;
2111 #endif
2112 }
2113
2114 /* ---------------------------------------------------------------------------
2115  * Delete the threads on the run queue of the current capability.
2116  * ------------------------------------------------------------------------- */
2117    
2118 static void
2119 deleteRunQueue (Capability *cap)
2120 {
2121     StgTSO *t, *next;
2122     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2123         ASSERT(t->what_next != ThreadRelocated);
2124         next = t->link;
2125         deleteThread(cap, t);
2126     }
2127 }
2128
2129 /* startThread and  insertThread are now in GranSim.c -- HWL */
2130
2131
2132 /* -----------------------------------------------------------------------------
2133    Managing the suspended_ccalling_tasks list.
2134    Locks required: sched_mutex
2135    -------------------------------------------------------------------------- */
2136
2137 STATIC_INLINE void
2138 suspendTask (Capability *cap, Task *task)
2139 {
2140     ASSERT(task->next == NULL && task->prev == NULL);
2141     task->next = cap->suspended_ccalling_tasks;
2142     task->prev = NULL;
2143     if (cap->suspended_ccalling_tasks) {
2144         cap->suspended_ccalling_tasks->prev = task;
2145     }
2146     cap->suspended_ccalling_tasks = task;
2147 }
2148
2149 STATIC_INLINE void
2150 recoverSuspendedTask (Capability *cap, Task *task)
2151 {
2152     if (task->prev) {
2153         task->prev->next = task->next;
2154     } else {
2155         ASSERT(cap->suspended_ccalling_tasks == task);
2156         cap->suspended_ccalling_tasks = task->next;
2157     }
2158     if (task->next) {
2159         task->next->prev = task->prev;
2160     }
2161     task->next = task->prev = NULL;
2162 }
2163
2164 /* ---------------------------------------------------------------------------
2165  * Suspending & resuming Haskell threads.
2166  * 
2167  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2168  * its capability before calling the C function.  This allows another
2169  * task to pick up the capability and carry on running Haskell
2170  * threads.  It also means that if the C call blocks, it won't lock
2171  * the whole system.
2172  *
2173  * The Haskell thread making the C call is put to sleep for the
2174  * duration of the call, on the susepended_ccalling_threads queue.  We
2175  * give out a token to the task, which it can use to resume the thread
2176  * on return from the C function.
2177  * ------------------------------------------------------------------------- */
2178    
2179 void *
2180 suspendThread (StgRegTable *reg)
2181 {
2182   Capability *cap;
2183   int saved_errno = errno;
2184   StgTSO *tso;
2185   Task *task;
2186
2187   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2188    */
2189   cap = regTableToCapability(reg);
2190
2191   task = cap->running_task;
2192   tso = cap->r.rCurrentTSO;
2193
2194   IF_DEBUG(scheduler,
2195            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2196
2197   // XXX this might not be necessary --SDM
2198   tso->what_next = ThreadRunGHC;
2199
2200   threadPaused(cap,tso);
2201
2202   if(tso->blocked_exceptions == NULL)  {
2203       tso->why_blocked = BlockedOnCCall;
2204       tso->blocked_exceptions = END_TSO_QUEUE;
2205   } else {
2206       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2207   }
2208
2209   // Hand back capability
2210   task->suspended_tso = tso;
2211
2212   ACQUIRE_LOCK(&cap->lock);
2213
2214   suspendTask(cap,task);
2215   cap->in_haskell = rtsFalse;
2216   releaseCapability_(cap);
2217   
2218   RELEASE_LOCK(&cap->lock);
2219
2220 #if defined(THREADED_RTS)
2221   /* Preparing to leave the RTS, so ensure there's a native thread/task
2222      waiting to take over.
2223   */
2224   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2225 #endif
2226
2227   errno = saved_errno;
2228   return task;
2229 }
2230
2231 StgRegTable *
2232 resumeThread (void *task_)
2233 {
2234     StgTSO *tso;
2235     Capability *cap;
2236     int saved_errno = errno;
2237     Task *task = task_;
2238
2239     cap = task->cap;
2240     // Wait for permission to re-enter the RTS with the result.
2241     waitForReturnCapability(&cap,task);
2242     // we might be on a different capability now... but if so, our
2243     // entry on the suspended_ccalling_tasks list will also have been
2244     // migrated.
2245
2246     // Remove the thread from the suspended list
2247     recoverSuspendedTask(cap,task);
2248
2249     tso = task->suspended_tso;
2250     task->suspended_tso = NULL;
2251     tso->link = END_TSO_QUEUE;
2252     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2253     
2254     if (tso->why_blocked == BlockedOnCCall) {
2255         awakenBlockedQueue(cap,tso->blocked_exceptions);
2256         tso->blocked_exceptions = NULL;
2257     }
2258     
2259     /* Reset blocking status */
2260     tso->why_blocked  = NotBlocked;
2261     
2262     cap->r.rCurrentTSO = tso;
2263     cap->in_haskell = rtsTrue;
2264     errno = saved_errno;
2265
2266     /* We might have GC'd, mark the TSO dirty again */
2267     dirtyTSO(tso);
2268
2269     return &cap->r;
2270 }
2271
2272 /* ---------------------------------------------------------------------------
2273  * Comparing Thread ids.
2274  *
2275  * This is used from STG land in the implementation of the
2276  * instances of Eq/Ord for ThreadIds.
2277  * ------------------------------------------------------------------------ */
2278
2279 int
2280 cmp_thread(StgPtr tso1, StgPtr tso2) 
2281
2282   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2283   StgThreadID id2 = ((StgTSO *)tso2)->id;
2284  
2285   if (id1 < id2) return (-1);
2286   if (id1 > id2) return 1;
2287   return 0;
2288 }
2289
2290 /* ---------------------------------------------------------------------------
2291  * Fetching the ThreadID from an StgTSO.
2292  *
2293  * This is used in the implementation of Show for ThreadIds.
2294  * ------------------------------------------------------------------------ */
2295 int
2296 rts_getThreadId(StgPtr tso) 
2297 {
2298   return ((StgTSO *)tso)->id;
2299 }
2300
2301 #ifdef DEBUG
2302 void
2303 labelThread(StgPtr tso, char *label)
2304 {
2305   int len;
2306   void *buf;
2307
2308   /* Caveat: Once set, you can only set the thread name to "" */
2309   len = strlen(label)+1;
2310   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2311   strncpy(buf,label,len);
2312   /* Update will free the old memory for us */
2313   updateThreadLabel(((StgTSO *)tso)->id,buf);
2314 }
2315 #endif /* DEBUG */
2316
2317 /* ---------------------------------------------------------------------------
2318    Create a new thread.
2319
2320    The new thread starts with the given stack size.  Before the
2321    scheduler can run, however, this thread needs to have a closure
2322    (and possibly some arguments) pushed on its stack.  See
2323    pushClosure() in Schedule.h.
2324
2325    createGenThread() and createIOThread() (in SchedAPI.h) are
2326    convenient packaged versions of this function.
2327
2328    currently pri (priority) is only used in a GRAN setup -- HWL
2329    ------------------------------------------------------------------------ */
2330 #if defined(GRAN)
2331 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2332 StgTSO *
2333 createThread(nat size, StgInt pri)
2334 #else
2335 StgTSO *
2336 createThread(Capability *cap, nat size)
2337 #endif
2338 {
2339     StgTSO *tso;
2340     nat stack_size;
2341
2342     /* sched_mutex is *not* required */
2343
2344     /* First check whether we should create a thread at all */
2345 #if defined(PARALLEL_HASKELL)
2346     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2347     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2348         threadsIgnored++;
2349         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2350                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2351         return END_TSO_QUEUE;
2352     }
2353     threadsCreated++;
2354 #endif
2355
2356 #if defined(GRAN)
2357     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2358 #endif
2359
2360     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2361
2362     /* catch ridiculously small stack sizes */
2363     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2364         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2365     }
2366
2367     stack_size = size - TSO_STRUCT_SIZEW;
2368     
2369     tso = (StgTSO *)allocateLocal(cap, size);
2370     TICK_ALLOC_TSO(stack_size, 0);
2371
2372     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2373 #if defined(GRAN)
2374     SET_GRAN_HDR(tso, ThisPE);
2375 #endif
2376
2377     // Always start with the compiled code evaluator
2378     tso->what_next = ThreadRunGHC;
2379
2380     tso->why_blocked  = NotBlocked;
2381     tso->blocked_exceptions = NULL;
2382     tso->flags = TSO_DIRTY;
2383     
2384     tso->saved_errno = 0;
2385     tso->bound = NULL;
2386     
2387     tso->stack_size     = stack_size;
2388     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2389                           - TSO_STRUCT_SIZEW;
2390     tso->sp             = (P_)&(tso->stack) + stack_size;
2391
2392     tso->trec = NO_TREC;
2393     
2394 #ifdef PROFILING
2395     tso->prof.CCCS = CCS_MAIN;
2396 #endif
2397     
2398   /* put a stop frame on the stack */
2399     tso->sp -= sizeofW(StgStopFrame);
2400     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2401     tso->link = END_TSO_QUEUE;
2402     
2403   // ToDo: check this
2404 #if defined(GRAN)
2405     /* uses more flexible routine in GranSim */
2406     insertThread(tso, CurrentProc);
2407 #else
2408     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2409      * from its creation
2410      */
2411 #endif
2412     
2413 #if defined(GRAN) 
2414     if (RtsFlags.GranFlags.GranSimStats.Full) 
2415         DumpGranEvent(GR_START,tso);
2416 #elif defined(PARALLEL_HASKELL)
2417     if (RtsFlags.ParFlags.ParStats.Full) 
2418         DumpGranEvent(GR_STARTQ,tso);
2419     /* HACk to avoid SCHEDULE 
2420        LastTSO = tso; */
2421 #endif
2422     
2423     /* Link the new thread on the global thread list.
2424      */
2425     ACQUIRE_LOCK(&sched_mutex);
2426     tso->id = next_thread_id++;  // while we have the mutex
2427     tso->global_link = all_threads;
2428     all_threads = tso;
2429     RELEASE_LOCK(&sched_mutex);
2430     
2431 #if defined(DIST)
2432     tso->dist.priority = MandatoryPriority; //by default that is...
2433 #endif
2434     
2435 #if defined(GRAN)
2436     tso->gran.pri = pri;
2437 # if defined(DEBUG)
2438     tso->gran.magic = TSO_MAGIC; // debugging only
2439 # endif
2440     tso->gran.sparkname   = 0;
2441     tso->gran.startedat   = CURRENT_TIME; 
2442     tso->gran.exported    = 0;
2443     tso->gran.basicblocks = 0;
2444     tso->gran.allocs      = 0;
2445     tso->gran.exectime    = 0;
2446     tso->gran.fetchtime   = 0;
2447     tso->gran.fetchcount  = 0;
2448     tso->gran.blocktime   = 0;
2449     tso->gran.blockcount  = 0;
2450     tso->gran.blockedat   = 0;
2451     tso->gran.globalsparks = 0;
2452     tso->gran.localsparks  = 0;
2453     if (RtsFlags.GranFlags.Light)
2454         tso->gran.clock  = Now; /* local clock */
2455     else
2456         tso->gran.clock  = 0;
2457     
2458     IF_DEBUG(gran,printTSO(tso));
2459 #elif defined(PARALLEL_HASKELL)
2460 # if defined(DEBUG)
2461     tso->par.magic = TSO_MAGIC; // debugging only
2462 # endif
2463     tso->par.sparkname   = 0;
2464     tso->par.startedat   = CURRENT_TIME; 
2465     tso->par.exported    = 0;
2466     tso->par.basicblocks = 0;
2467     tso->par.allocs      = 0;
2468     tso->par.exectime    = 0;
2469     tso->par.fetchtime   = 0;
2470     tso->par.fetchcount  = 0;
2471     tso->par.blocktime   = 0;
2472     tso->par.blockcount  = 0;
2473     tso->par.blockedat   = 0;
2474     tso->par.globalsparks = 0;
2475     tso->par.localsparks  = 0;
2476 #endif
2477     
2478 #if defined(GRAN)
2479     globalGranStats.tot_threads_created++;
2480     globalGranStats.threads_created_on_PE[CurrentProc]++;
2481     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2482     globalGranStats.tot_sq_probes++;
2483 #elif defined(PARALLEL_HASKELL)
2484     // collect parallel global statistics (currently done together with GC stats)
2485     if (RtsFlags.ParFlags.ParStats.Global &&
2486         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2487         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2488         globalParStats.tot_threads_created++;
2489     }
2490 #endif 
2491     
2492 #if defined(GRAN)
2493     IF_GRAN_DEBUG(pri,
2494                   sched_belch("==__ schedule: Created TSO %d (%p);",
2495                               CurrentProc, tso, tso->id));
2496 #elif defined(PARALLEL_HASKELL)
2497     IF_PAR_DEBUG(verbose,
2498                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2499                              (long)tso->id, tso, advisory_thread_count));
2500 #else
2501     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2502                                    (long)tso->id, (long)tso->stack_size));
2503 #endif    
2504     return tso;
2505 }
2506
2507 #if defined(PAR)
2508 /* RFP:
2509    all parallel thread creation calls should fall through the following routine.
2510 */
2511 StgTSO *
2512 createThreadFromSpark(rtsSpark spark) 
2513 { StgTSO *tso;
2514   ASSERT(spark != (rtsSpark)NULL);
2515 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2516   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2517   { threadsIgnored++;
2518     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2519           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2520     return END_TSO_QUEUE;
2521   }
2522   else
2523   { threadsCreated++;
2524     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2525     if (tso==END_TSO_QUEUE)     
2526       barf("createSparkThread: Cannot create TSO");
2527 #if defined(DIST)
2528     tso->priority = AdvisoryPriority;
2529 #endif
2530     pushClosure(tso,spark);
2531     addToRunQueue(tso);
2532     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2533   }
2534   return tso;
2535 }
2536 #endif
2537
2538 /*
2539   Turn a spark into a thread.
2540   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2541 */
2542 #if 0
2543 StgTSO *
2544 activateSpark (rtsSpark spark) 
2545 {
2546   StgTSO *tso;
2547
2548   tso = createSparkThread(spark);
2549   if (RtsFlags.ParFlags.ParStats.Full) {   
2550     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2551       IF_PAR_DEBUG(verbose,
2552                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2553                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2554   }
2555   // ToDo: fwd info on local/global spark to thread -- HWL
2556   // tso->gran.exported =  spark->exported;
2557   // tso->gran.locked =   !spark->global;
2558   // tso->gran.sparkname = spark->name;
2559
2560   return tso;
2561 }
2562 #endif
2563
2564 /* ---------------------------------------------------------------------------
2565  * scheduleThread()
2566  *
2567  * scheduleThread puts a thread on the end  of the runnable queue.
2568  * This will usually be done immediately after a thread is created.
2569  * The caller of scheduleThread must create the thread using e.g.
2570  * createThread and push an appropriate closure
2571  * on this thread's stack before the scheduler is invoked.
2572  * ------------------------------------------------------------------------ */
2573
2574 void
2575 scheduleThread(Capability *cap, StgTSO *tso)
2576 {
2577     // The thread goes at the *end* of the run-queue, to avoid possible
2578     // starvation of any threads already on the queue.
2579     appendToRunQueue(cap,tso);
2580 }
2581
2582 Capability *
2583 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2584 {
2585     Task *task;
2586
2587     // We already created/initialised the Task
2588     task = cap->running_task;
2589
2590     // This TSO is now a bound thread; make the Task and TSO
2591     // point to each other.
2592     tso->bound = task;
2593
2594     task->tso = tso;
2595     task->ret = ret;
2596     task->stat = NoStatus;
2597
2598     appendToRunQueue(cap,tso);
2599
2600     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2601
2602 #if defined(GRAN)
2603     /* GranSim specific init */
2604     CurrentTSO = m->tso;                // the TSO to run
2605     procStatus[MainProc] = Busy;        // status of main PE
2606     CurrentProc = MainProc;             // PE to run it on
2607 #endif
2608
2609     cap = schedule(cap,task);
2610
2611     ASSERT(task->stat != NoStatus);
2612     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2613
2614     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2615     return cap;
2616 }
2617
2618 /* ----------------------------------------------------------------------------
2619  * Starting Tasks
2620  * ------------------------------------------------------------------------- */
2621
2622 #if defined(THREADED_RTS)
2623 void
2624 workerStart(Task *task)
2625 {
2626     Capability *cap;
2627
2628     // See startWorkerTask().
2629     ACQUIRE_LOCK(&task->lock);
2630     cap = task->cap;
2631     RELEASE_LOCK(&task->lock);
2632
2633     // set the thread-local pointer to the Task:
2634     taskEnter(task);
2635
2636     // schedule() runs without a lock.
2637     cap = schedule(cap,task);
2638
2639     // On exit from schedule(), we have a Capability.
2640     releaseCapability(cap);
2641     taskStop(task);
2642 }
2643 #endif
2644
2645 /* ---------------------------------------------------------------------------
2646  * initScheduler()
2647  *
2648  * Initialise the scheduler.  This resets all the queues - if the
2649  * queues contained any threads, they'll be garbage collected at the
2650  * next pass.
2651  *
2652  * ------------------------------------------------------------------------ */
2653
2654 void 
2655 initScheduler(void)
2656 {
2657 #if defined(GRAN)
2658   nat i;
2659   for (i=0; i<=MAX_PROC; i++) {
2660     run_queue_hds[i]      = END_TSO_QUEUE;
2661     run_queue_tls[i]      = END_TSO_QUEUE;
2662     blocked_queue_hds[i]  = END_TSO_QUEUE;
2663     blocked_queue_tls[i]  = END_TSO_QUEUE;
2664     ccalling_threadss[i]  = END_TSO_QUEUE;
2665     blackhole_queue[i]    = END_TSO_QUEUE;
2666     sleeping_queue        = END_TSO_QUEUE;
2667   }
2668 #elif !defined(THREADED_RTS)
2669   blocked_queue_hd  = END_TSO_QUEUE;
2670   blocked_queue_tl  = END_TSO_QUEUE;
2671   sleeping_queue    = END_TSO_QUEUE;
2672 #endif
2673
2674   blackhole_queue   = END_TSO_QUEUE;
2675   all_threads       = END_TSO_QUEUE;
2676
2677   context_switch = 0;
2678   interrupted    = 0;
2679
2680   RtsFlags.ConcFlags.ctxtSwitchTicks =
2681       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2682       
2683 #if defined(THREADED_RTS)
2684   /* Initialise the mutex and condition variables used by
2685    * the scheduler. */
2686   initMutex(&sched_mutex);
2687 #endif
2688   
2689   ACQUIRE_LOCK(&sched_mutex);
2690
2691   /* A capability holds the state a native thread needs in
2692    * order to execute STG code. At least one capability is
2693    * floating around (only THREADED_RTS builds have more than one).
2694    */
2695   initCapabilities();
2696
2697   initTaskManager();
2698
2699 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2700   initSparkPools();
2701 #endif
2702
2703 #if defined(THREADED_RTS)
2704   /*
2705    * Eagerly start one worker to run each Capability, except for
2706    * Capability 0.  The idea is that we're probably going to start a
2707    * bound thread on Capability 0 pretty soon, so we don't want a
2708    * worker task hogging it.
2709    */
2710   { 
2711       nat i;
2712       Capability *cap;
2713       for (i = 1; i < n_capabilities; i++) {
2714           cap = &capabilities[i];
2715           ACQUIRE_LOCK(&cap->lock);
2716           startWorkerTask(cap, workerStart);
2717           RELEASE_LOCK(&cap->lock);
2718       }
2719   }
2720 #endif
2721
2722   RELEASE_LOCK(&sched_mutex);
2723 }
2724
2725 void
2726 exitScheduler( void )
2727 {
2728     interrupted = rtsTrue;
2729     shutting_down_scheduler = rtsTrue;
2730
2731 #if defined(THREADED_RTS)
2732     { 
2733         Task *task;
2734         nat i;
2735         
2736         ACQUIRE_LOCK(&sched_mutex);
2737         task = newBoundTask();
2738         RELEASE_LOCK(&sched_mutex);
2739
2740         for (i = 0; i < n_capabilities; i++) {
2741             shutdownCapability(&capabilities[i], task);
2742         }
2743         boundTaskExiting(task);
2744         stopTaskManager();
2745     }
2746 #endif
2747 }
2748
2749 /* ---------------------------------------------------------------------------
2750    Where are the roots that we know about?
2751
2752         - all the threads on the runnable queue
2753         - all the threads on the blocked queue
2754         - all the threads on the sleeping queue
2755         - all the thread currently executing a _ccall_GC
2756         - all the "main threads"
2757      
2758    ------------------------------------------------------------------------ */
2759
2760 /* This has to be protected either by the scheduler monitor, or by the
2761         garbage collection monitor (probably the latter).
2762         KH @ 25/10/99
2763 */
2764
2765 void
2766 GetRoots( evac_fn evac )
2767 {
2768     nat i;
2769     Capability *cap;
2770     Task *task;
2771
2772 #if defined(GRAN)
2773     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2774         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2775             evac((StgClosure **)&run_queue_hds[i]);
2776         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2777             evac((StgClosure **)&run_queue_tls[i]);
2778         
2779         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2780             evac((StgClosure **)&blocked_queue_hds[i]);
2781         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2782             evac((StgClosure **)&blocked_queue_tls[i]);
2783         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2784             evac((StgClosure **)&ccalling_threads[i]);
2785     }
2786
2787     markEventQueue();
2788
2789 #else /* !GRAN */
2790
2791     for (i = 0; i < n_capabilities; i++) {
2792         cap = &capabilities[i];
2793         evac((StgClosure **)&cap->run_queue_hd);
2794         evac((StgClosure **)&cap->run_queue_tl);
2795         
2796         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2797              task=task->next) {
2798             evac((StgClosure **)&task->suspended_tso);
2799         }
2800     }
2801     
2802 #if !defined(THREADED_RTS)
2803     evac((StgClosure **)(void *)&blocked_queue_hd);
2804     evac((StgClosure **)(void *)&blocked_queue_tl);
2805     evac((StgClosure **)(void *)&sleeping_queue);
2806 #endif 
2807 #endif
2808
2809     // evac((StgClosure **)&blackhole_queue);
2810
2811 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2812     markSparkQueue(evac);
2813 #endif
2814     
2815 #if defined(RTS_USER_SIGNALS)
2816     // mark the signal handlers (signals should be already blocked)
2817     markSignalHandlers(evac);
2818 #endif
2819 }
2820
2821 /* -----------------------------------------------------------------------------
2822    performGC
2823
2824    This is the interface to the garbage collector from Haskell land.
2825    We provide this so that external C code can allocate and garbage
2826    collect when called from Haskell via _ccall_GC.
2827
2828    It might be useful to provide an interface whereby the programmer
2829    can specify more roots (ToDo).
2830    
2831    This needs to be protected by the GC condition variable above.  KH.
2832    -------------------------------------------------------------------------- */
2833
2834 static void (*extra_roots)(evac_fn);
2835
2836 static void
2837 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2838 {
2839     Task *task = myTask();
2840
2841     if (task == NULL) {
2842         ACQUIRE_LOCK(&sched_mutex);
2843         task = newBoundTask();
2844         RELEASE_LOCK(&sched_mutex);
2845         scheduleDoGC(NULL,task,force_major, get_roots);
2846         boundTaskExiting(task);
2847     } else {
2848         scheduleDoGC(NULL,task,force_major, get_roots);
2849     }
2850 }
2851
2852 void
2853 performGC(void)
2854 {
2855     performGC_(rtsFalse, GetRoots);
2856 }
2857
2858 void
2859 performMajorGC(void)
2860 {
2861     performGC_(rtsTrue, GetRoots);
2862 }
2863
2864 static void
2865 AllRoots(evac_fn evac)
2866 {
2867     GetRoots(evac);             // the scheduler's roots
2868     extra_roots(evac);          // the user's roots
2869 }
2870
2871 void
2872 performGCWithRoots(void (*get_roots)(evac_fn))
2873 {
2874     extra_roots = get_roots;
2875     performGC_(rtsFalse, AllRoots);
2876 }
2877
2878 /* -----------------------------------------------------------------------------
2879    Stack overflow
2880
2881    If the thread has reached its maximum stack size, then raise the
2882    StackOverflow exception in the offending thread.  Otherwise
2883    relocate the TSO into a larger chunk of memory and adjust its stack
2884    size appropriately.
2885    -------------------------------------------------------------------------- */
2886
2887 static StgTSO *
2888 threadStackOverflow(Capability *cap, StgTSO *tso)
2889 {
2890   nat new_stack_size, stack_words;
2891   lnat new_tso_size;
2892   StgPtr new_sp;
2893   StgTSO *dest;
2894
2895   IF_DEBUG(sanity,checkTSO(tso));
2896   if (tso->stack_size >= tso->max_stack_size) {
2897
2898     IF_DEBUG(gc,
2899              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2900                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2901              /* If we're debugging, just print out the top of the stack */
2902              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2903                                               tso->sp+64)));
2904
2905     /* Send this thread the StackOverflow exception */
2906     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2907     return tso;
2908   }
2909
2910   /* Try to double the current stack size.  If that takes us over the
2911    * maximum stack size for this thread, then use the maximum instead.
2912    * Finally round up so the TSO ends up as a whole number of blocks.
2913    */
2914   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2915   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2916                                        TSO_STRUCT_SIZE)/sizeof(W_);
2917   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2918   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2919
2920   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2921
2922   dest = (StgTSO *)allocate(new_tso_size);
2923   TICK_ALLOC_TSO(new_stack_size,0);
2924
2925   /* copy the TSO block and the old stack into the new area */
2926   memcpy(dest,tso,TSO_STRUCT_SIZE);
2927   stack_words = tso->stack + tso->stack_size - tso->sp;
2928   new_sp = (P_)dest + new_tso_size - stack_words;
2929   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2930
2931   /* relocate the stack pointers... */
2932   dest->sp         = new_sp;
2933   dest->stack_size = new_stack_size;
2934         
2935   /* Mark the old TSO as relocated.  We have to check for relocated
2936    * TSOs in the garbage collector and any primops that deal with TSOs.
2937    *
2938    * It's important to set the sp value to just beyond the end
2939    * of the stack, so we don't attempt to scavenge any part of the
2940    * dead TSO's stack.
2941    */
2942   tso->what_next = ThreadRelocated;
2943   tso->link = dest;
2944   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2945   tso->why_blocked = NotBlocked;
2946
2947   IF_PAR_DEBUG(verbose,
2948                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2949                      tso->id, tso, tso->stack_size);
2950                /* If we're debugging, just print out the top of the stack */
2951                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2952                                                 tso->sp+64)));
2953   
2954   IF_DEBUG(sanity,checkTSO(tso));
2955 #if 0
2956   IF_DEBUG(scheduler,printTSO(dest));
2957 #endif
2958
2959   return dest;
2960 }
2961
2962 /* ---------------------------------------------------------------------------
2963    Wake up a queue that was blocked on some resource.
2964    ------------------------------------------------------------------------ */
2965
2966 #if defined(GRAN)
2967 STATIC_INLINE void
2968 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2969 {
2970 }
2971 #elif defined(PARALLEL_HASKELL)
2972 STATIC_INLINE void
2973 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2974 {
2975   /* write RESUME events to log file and
2976      update blocked and fetch time (depending on type of the orig closure) */
2977   if (RtsFlags.ParFlags.ParStats.Full) {
2978     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2979                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2980                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2981     if (emptyRunQueue())
2982       emitSchedule = rtsTrue;
2983
2984     switch (get_itbl(node)->type) {
2985         case FETCH_ME_BQ:
2986           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2987           break;
2988         case RBH:
2989         case FETCH_ME:
2990         case BLACKHOLE_BQ:
2991           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2992           break;
2993 #ifdef DIST
2994         case MVAR:
2995           break;
2996 #endif    
2997         default:
2998           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2999         }
3000       }
3001 }
3002 #endif
3003
3004 #if defined(GRAN)
3005 StgBlockingQueueElement *
3006 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3007 {
3008     StgTSO *tso;
3009     PEs node_loc, tso_loc;
3010
3011     node_loc = where_is(node); // should be lifted out of loop
3012     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3013     tso_loc = where_is((StgClosure *)tso);
3014     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3015       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3016       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3017       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3018       // insertThread(tso, node_loc);
3019       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3020                 ResumeThread,
3021                 tso, node, (rtsSpark*)NULL);
3022       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3023       // len_local++;
3024       // len++;
3025     } else { // TSO is remote (actually should be FMBQ)
3026       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3027                                   RtsFlags.GranFlags.Costs.gunblocktime +
3028                                   RtsFlags.GranFlags.Costs.latency;
3029       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3030                 UnblockThread,
3031                 tso, node, (rtsSpark*)NULL);
3032       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3033       // len++;
3034     }
3035     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3036     IF_GRAN_DEBUG(bq,
3037                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3038                           (node_loc==tso_loc ? "Local" : "Global"), 
3039                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3040     tso->block_info.closure = NULL;
3041     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3042                              tso->id, tso));
3043 }
3044 #elif defined(PARALLEL_HASKELL)
3045 StgBlockingQueueElement *
3046 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3047 {
3048     StgBlockingQueueElement *next;
3049
3050     switch (get_itbl(bqe)->type) {
3051     case TSO:
3052       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3053       /* if it's a TSO just push it onto the run_queue */
3054       next = bqe->link;
3055       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3056       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3057       threadRunnable();
3058       unblockCount(bqe, node);
3059       /* reset blocking status after dumping event */
3060       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3061       break;
3062
3063     case BLOCKED_FETCH:
3064       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3065       next = bqe->link;
3066       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3067       PendingFetches = (StgBlockedFetch *)bqe;
3068       break;
3069
3070 # if defined(DEBUG)
3071       /* can ignore this case in a non-debugging setup; 
3072          see comments on RBHSave closures above */
3073     case CONSTR:
3074       /* check that the closure is an RBHSave closure */
3075       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3076              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3077              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3078       break;
3079
3080     default:
3081       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3082            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3083            (StgClosure *)bqe);
3084 # endif
3085     }
3086   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3087   return next;
3088 }
3089 #endif
3090
3091 StgTSO *
3092 unblockOne(Capability *cap, StgTSO *tso)
3093 {
3094   StgTSO *next;
3095
3096   ASSERT(get_itbl(tso)->type == TSO);
3097   ASSERT(tso->why_blocked != NotBlocked);
3098   tso->why_blocked = NotBlocked;
3099   next = tso->link;
3100   tso->link = END_TSO_QUEUE;
3101
3102   // We might have just migrated this TSO to our Capability:
3103   if (tso->bound) {
3104       tso->bound->cap = cap;
3105   }
3106
3107   appendToRunQueue(cap,tso);
3108
3109   // we're holding a newly woken thread, make sure we context switch
3110   // quickly so we can migrate it if necessary.
3111   context_switch = 1;
3112   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3113   return next;
3114 }
3115
3116
3117 #if defined(GRAN)
3118 void 
3119 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3120 {
3121   StgBlockingQueueElement *bqe;
3122   PEs node_loc;
3123   nat len = 0; 
3124
3125   IF_GRAN_DEBUG(bq, 
3126                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3127                       node, CurrentProc, CurrentTime[CurrentProc], 
3128                       CurrentTSO->id, CurrentTSO));
3129
3130   node_loc = where_is(node);
3131
3132   ASSERT(q == END_BQ_QUEUE ||
3133          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3134          get_itbl(q)->type == CONSTR); // closure (type constructor)
3135   ASSERT(is_unique(node));
3136
3137   /* FAKE FETCH: magically copy the node to the tso's proc;
3138      no Fetch necessary because in reality the node should not have been 
3139      moved to the other PE in the first place
3140   */
3141   if (CurrentProc!=node_loc) {
3142     IF_GRAN_DEBUG(bq, 
3143                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3144                         node, node_loc, CurrentProc, CurrentTSO->id, 
3145                         // CurrentTSO, where_is(CurrentTSO),
3146                         node->header.gran.procs));
3147     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3148     IF_GRAN_DEBUG(bq, 
3149                   debugBelch("## new bitmask of node %p is %#x\n",
3150                         node, node->header.gran.procs));
3151     if (RtsFlags.GranFlags.GranSimStats.Global) {
3152       globalGranStats.tot_fake_fetches++;
3153     }
3154   }
3155
3156   bqe = q;
3157   // ToDo: check: ASSERT(CurrentProc==node_loc);
3158   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3159     //next = bqe->link;
3160     /* 
3161        bqe points to the current element in the queue
3162        next points to the next element in the queue
3163     */
3164     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3165     //tso_loc = where_is(tso);
3166     len++;
3167     bqe = unblockOne(bqe, node);
3168   }
3169
3170   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3171      the closure to make room for the anchor of the BQ */
3172   if (bqe!=END_BQ_QUEUE) {
3173     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3174     /*
3175     ASSERT((info_ptr==&RBH_Save_0_info) ||
3176            (info_ptr==&RBH_Save_1_info) ||
3177            (info_ptr==&RBH_Save_2_info));
3178     */
3179     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3180     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3181     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3182
3183     IF_GRAN_DEBUG(bq,
3184                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3185                         node, info_type(node)));
3186   }
3187
3188   /* statistics gathering */
3189   if (RtsFlags.GranFlags.GranSimStats.Global) {
3190     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3191     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3192     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3193     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3194   }
3195   IF_GRAN_DEBUG(bq,
3196                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3197                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3198 }
3199 #elif defined(PARALLEL_HASKELL)
3200 void 
3201 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3202 {
3203   StgBlockingQueueElement *bqe;
3204
3205   IF_PAR_DEBUG(verbose, 
3206                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3207                      node, mytid));
3208 #ifdef DIST  
3209   //RFP
3210   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3211     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3212     return;
3213   }
3214 #endif
3215   
3216   ASSERT(q == END_BQ_QUEUE ||
3217          get_itbl(q)->type == TSO ||           
3218          get_itbl(q)->type == BLOCKED_FETCH || 
3219          get_itbl(q)->type == CONSTR); 
3220
3221   bqe = q;
3222   while (get_itbl(bqe)->type==TSO || 
3223          get_itbl(bqe)->type==BLOCKED_FETCH) {
3224     bqe = unblockOne(bqe, node);
3225   }
3226 }
3227
3228 #else   /* !GRAN && !PARALLEL_HASKELL */
3229
3230 void
3231 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3232 {
3233     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3234                              // Exception.cmm
3235     while (tso != END_TSO_QUEUE) {
3236         tso = unblockOne(cap,tso);
3237     }
3238 }
3239 #endif
3240
3241 /* ---------------------------------------------------------------------------
3242    Interrupt execution
3243    - usually called inside a signal handler so it mustn't do anything fancy.   
3244    ------------------------------------------------------------------------ */
3245
3246 void
3247 interruptStgRts(void)
3248 {
3249     interrupted    = 1;
3250     context_switch = 1;
3251 #if defined(THREADED_RTS)
3252     prodAllCapabilities();
3253 #endif
3254 }
3255
3256 /* -----------------------------------------------------------------------------
3257    Unblock a thread
3258
3259    This is for use when we raise an exception in another thread, which
3260    may be blocked.
3261    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3262    -------------------------------------------------------------------------- */
3263
3264 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3265 /*
3266   NB: only the type of the blocking queue is different in GranSim and GUM
3267       the operations on the queue-elements are the same
3268       long live polymorphism!
3269
3270   Locks: sched_mutex is held upon entry and exit.
3271
3272 */
3273 static void
3274 unblockThread(Capability *cap, StgTSO *tso)
3275 {
3276   StgBlockingQueueElement *t, **last;
3277
3278   switch (tso->why_blocked) {
3279
3280   case NotBlocked:
3281     return;  /* not blocked */
3282
3283   case BlockedOnSTM:
3284     // Be careful: nothing to do here!  We tell the scheduler that the thread
3285     // is runnable and we leave it to the stack-walking code to abort the 
3286     // transaction while unwinding the stack.  We should perhaps have a debugging
3287     // test to make sure that this really happens and that the 'zombie' transaction
3288     // does not get committed.
3289     goto done;
3290
3291   case BlockedOnMVar:
3292     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3293     {
3294       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3295       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3296
3297       last = (StgBlockingQueueElement **)&mvar->head;
3298       for (t = (StgBlockingQueueElement *)mvar->head; 
3299            t != END_BQ_QUEUE; 
3300            last = &t->link, last_tso = t, t = t->link) {
3301         if (t == (StgBlockingQueueElement *)tso) {
3302           *last = (StgBlockingQueueElement *)tso->link;
3303           if (mvar->tail == tso) {
3304             mvar->tail = (StgTSO *)last_tso;
3305           }
3306           goto done;
3307         }
3308       }
3309       barf("unblockThread (MVAR): TSO not found");
3310     }
3311
3312   case BlockedOnBlackHole:
3313     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3314     {
3315       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3316
3317       last = &bq->blocking_queue;
3318       for (t = bq->blocking_queue; 
3319            t != END_BQ_QUEUE; 
3320            last = &t->link, t = t->link) {
3321         if (t == (StgBlockingQueueElement *)tso) {
3322           *last = (StgBlockingQueueElement *)tso->link;
3323           goto done;
3324         }
3325       }
3326       barf("unblockThread (BLACKHOLE): TSO not found");
3327     }
3328
3329   case BlockedOnException:
3330     {
3331       StgTSO *target  = tso->block_info.tso;
3332
3333       ASSERT(get_itbl(target)->type == TSO);
3334
3335       if (target->what_next == ThreadRelocated) {
3336           target = target->link;
3337           ASSERT(get_itbl(target)->type == TSO);
3338       }
3339
3340       ASSERT(target->blocked_exceptions != NULL);
3341
3342       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3343       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3344            t != END_BQ_QUEUE; 
3345            last = &t->link, t = t->link) {
3346         ASSERT(get_itbl(t)->type == TSO);
3347         if (t == (StgBlockingQueueElement *)tso) {
3348           *last = (StgBlockingQueueElement *)tso->link;
3349           goto done;
3350         }
3351       }
3352       barf("unblockThread (Exception): TSO not found");
3353     }
3354
3355   case BlockedOnRead:
3356   case BlockedOnWrite:
3357 #if defined(mingw32_HOST_OS)
3358   case BlockedOnDoProc:
3359 #endif
3360     {
3361       /* take TSO off blocked_queue */
3362       StgBlockingQueueElement *prev = NULL;
3363       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3364            prev = t, t = t->link) {
3365         if (t == (StgBlockingQueueElement *)tso) {
3366           if (prev == NULL) {
3367             blocked_queue_hd = (StgTSO *)t->link;
3368             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3369               blocked_queue_tl = END_TSO_QUEUE;
3370             }
3371           } else {
3372             prev->link = t->link;
3373             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3374               blocked_queue_tl = (StgTSO *)prev;
3375             }
3376           }
3377 #if defined(mingw32_HOST_OS)
3378           /* (Cooperatively) signal that the worker thread should abort
3379            * the request.
3380            */
3381           abandonWorkRequest(tso->block_info.async_result->reqID);
3382 #endif
3383           goto done;
3384         }
3385       }
3386       barf("unblockThread (I/O): TSO not found");
3387     }
3388
3389   case BlockedOnDelay:
3390     {
3391       /* take TSO off sleeping_queue */
3392       StgBlockingQueueElement *prev = NULL;
3393       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3394            prev = t, t = t->link) {
3395         if (t == (StgBlockingQueueElement *)tso) {
3396           if (prev == NULL) {
3397             sleeping_queue = (StgTSO *)t->link;
3398           } else {
3399             prev->link = t->link;
3400           }
3401           goto done;
3402         }
3403       }
3404       barf("unblockThread (delay): TSO not found");
3405     }
3406
3407   default:
3408     barf("unblockThread");
3409   }
3410
3411  done:
3412   tso->link = END_TSO_QUEUE;
3413   tso->why_blocked = NotBlocked;
3414   tso->block_info.closure = NULL;
3415   pushOnRunQueue(cap,tso);
3416 }
3417 #else
3418 static void
3419 unblockThread(Capability *cap, StgTSO *tso)
3420 {
3421   StgTSO *t, **last;
3422   
3423   /* To avoid locking unnecessarily. */
3424   if (tso->why_blocked == NotBlocked) {
3425     return;
3426   }
3427
3428   switch (tso->why_blocked) {
3429
3430   case BlockedOnSTM:
3431     // Be careful: nothing to do here!  We tell the scheduler that the thread
3432     // is runnable and we leave it to the stack-walking code to abort the 
3433     // transaction while unwinding the stack.  We should perhaps have a debugging
3434     // test to make sure that this really happens and that the 'zombie' transaction
3435     // does not get committed.
3436     goto done;
3437
3438   case BlockedOnMVar:
3439     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3440     {
3441       StgTSO *last_tso = END_TSO_QUEUE;
3442       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3443
3444       last = &mvar->head;
3445       for (t = mvar->head; t != END_TSO_QUEUE; 
3446            last = &t->link, last_tso = t, t = t->link) {
3447         if (t == tso) {
3448           *last = tso->link;
3449           if (mvar->tail == tso) {
3450             mvar->tail = last_tso;
3451           }
3452           goto done;
3453         }
3454       }
3455       barf("unblockThread (MVAR): TSO not found");
3456     }
3457
3458   case BlockedOnBlackHole:
3459     {
3460       last = &blackhole_queue;
3461       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3462            last = &t->link, t = t->link) {
3463         if (t == tso) {
3464           *last = tso->link;
3465           goto done;
3466         }
3467       }
3468       barf("unblockThread (BLACKHOLE): TSO not found");
3469     }
3470
3471   case BlockedOnException:
3472     {
3473       StgTSO *target  = tso->block_info.tso;
3474
3475       ASSERT(get_itbl(target)->type == TSO);
3476
3477       while (target->what_next == ThreadRelocated) {
3478           target = target->link;
3479           ASSERT(get_itbl(target)->type == TSO);
3480       }
3481       
3482       ASSERT(target->blocked_exceptions != NULL);
3483
3484       last = &target->blocked_exceptions;
3485       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3486            last = &t->link, t = t->link) {
3487         ASSERT(get_itbl(t)->type == TSO);
3488         if (t == tso) {
3489           *last = tso->link;
3490           goto done;
3491         }
3492       }
3493       barf("unblockThread (Exception): TSO not found");
3494     }
3495
3496 #if !defined(THREADED_RTS)
3497   case BlockedOnRead:
3498   case BlockedOnWrite:
3499 #if defined(mingw32_HOST_OS)
3500   case BlockedOnDoProc:
3501 #endif
3502     {
3503       StgTSO *prev = NULL;
3504       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3505            prev = t, t = t->link) {
3506         if (t == tso) {
3507           if (prev == NULL) {
3508             blocked_queue_hd = t->link;
3509             if (blocked_queue_tl == t) {
3510               blocked_queue_tl = END_TSO_QUEUE;
3511             }
3512           } else {
3513             prev->link = t->link;
3514             if (blocked_queue_tl == t) {
3515               blocked_queue_tl = prev;
3516             }
3517           }
3518 #if defined(mingw32_HOST_OS)
3519           /* (Cooperatively) signal that the worker thread should abort
3520            * the request.
3521            */
3522           abandonWorkRequest(tso->block_info.async_result->reqID);
3523 #endif
3524           goto done;
3525         }
3526       }
3527       barf("unblockThread (I/O): TSO not found");
3528     }
3529
3530   case BlockedOnDelay:
3531     {
3532       StgTSO *prev = NULL;
3533       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3534            prev = t, t = t->link) {
3535         if (t == tso) {
3536           if (prev == NULL) {
3537             sleeping_queue = t->link;
3538           } else {
3539             prev->link = t->link;
3540           }
3541           goto done;
3542         }
3543       }
3544       barf("unblockThread (delay): TSO not found");
3545     }
3546 #endif
3547
3548   default:
3549     barf("unblockThread");
3550   }
3551
3552  done:
3553   tso->link = END_TSO_QUEUE;
3554   tso->why_blocked = NotBlocked;
3555   tso->block_info.closure = NULL;
3556   appendToRunQueue(cap,tso);
3557 }
3558 #endif
3559
3560 /* -----------------------------------------------------------------------------
3561  * checkBlackHoles()
3562  *
3563  * Check the blackhole_queue for threads that can be woken up.  We do
3564  * this periodically: before every GC, and whenever the run queue is
3565  * empty.
3566  *
3567  * An elegant solution might be to just wake up all the blocked
3568  * threads with awakenBlockedQueue occasionally: they'll go back to
3569  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3570  * doesn't give us a way to tell whether we've actually managed to
3571  * wake up any threads, so we would be busy-waiting.
3572  *
3573  * -------------------------------------------------------------------------- */
3574
3575 static rtsBool
3576 checkBlackHoles (Capability *cap)
3577 {
3578     StgTSO **prev, *t;
3579     rtsBool any_woke_up = rtsFalse;
3580     StgHalfWord type;
3581
3582     // blackhole_queue is global:
3583     ASSERT_LOCK_HELD(&sched_mutex);
3584
3585     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3586
3587     // ASSUMES: sched_mutex
3588     prev = &blackhole_queue;
3589     t = blackhole_queue;
3590     while (t != END_TSO_QUEUE) {
3591         ASSERT(t->why_blocked == BlockedOnBlackHole);
3592         type = get_itbl(t->block_info.closure)->type;
3593         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3594             IF_DEBUG(sanity,checkTSO(t));
3595             t = unblockOne(cap, t);
3596             // urk, the threads migrate to the current capability
3597             // here, but we'd like to keep them on the original one.
3598             *prev = t;
3599             any_woke_up = rtsTrue;
3600         } else {
3601             prev = &t->link;
3602             t = t->link;
3603         }
3604     }
3605
3606     return any_woke_up;
3607 }
3608
3609 /* -----------------------------------------------------------------------------
3610  * raiseAsync()
3611  *
3612  * The following function implements the magic for raising an
3613  * asynchronous exception in an existing thread.
3614  *
3615  * We first remove the thread from any queue on which it might be
3616  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3617  *
3618  * We strip the stack down to the innermost CATCH_FRAME, building
3619  * thunks in the heap for all the active computations, so they can 
3620  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3621  * an application of the handler to the exception, and push it on
3622  * the top of the stack.
3623  * 
3624  * How exactly do we save all the active computations?  We create an
3625  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3626  * AP_STACKs pushes everything from the corresponding update frame
3627  * upwards onto the stack.  (Actually, it pushes everything up to the
3628  * next update frame plus a pointer to the next AP_STACK object.
3629  * Entering the next AP_STACK object pushes more onto the stack until we
3630  * reach the last AP_STACK object - at which point the stack should look
3631  * exactly as it did when we killed the TSO and we can continue
3632  * execution by entering the closure on top of the stack.
3633  *
3634  * We can also kill a thread entirely - this happens if either (a) the 
3635  * exception passed to raiseAsync is NULL, or (b) there's no
3636  * CATCH_FRAME on the stack.  In either case, we strip the entire
3637  * stack and replace the thread with a zombie.
3638  *
3639  * ToDo: in THREADED_RTS mode, this function is only safe if either
3640  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3641  * one Capability), or (b) we own the Capability that the TSO is
3642  * currently blocked on or on the run queue of.
3643  *
3644  * -------------------------------------------------------------------------- */
3645  
3646 void
3647 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3648 {
3649     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3650 }
3651
3652 void
3653 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3654 {
3655     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3656 }
3657
3658 static void
3659 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3660             rtsBool stop_at_atomically, StgPtr stop_here)
3661 {
3662     StgRetInfoTable *info;
3663     StgPtr sp, frame;
3664     nat i;
3665   
3666     // Thread already dead?
3667     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3668         return;
3669     }
3670
3671     IF_DEBUG(scheduler, 
3672              sched_belch("raising exception in thread %ld.", (long)tso->id));
3673     
3674     // Remove it from any blocking queues
3675     unblockThread(cap,tso);
3676
3677     // mark it dirty; we're about to change its stack.
3678     dirtyTSO(tso);
3679
3680     sp = tso->sp;
3681     
3682     // The stack freezing code assumes there's a closure pointer on
3683     // the top of the stack, so we have to arrange that this is the case...
3684     //
3685     if (sp[0] == (W_)&stg_enter_info) {
3686         sp++;
3687     } else {
3688         sp--;
3689         sp[0] = (W_)&stg_dummy_ret_closure;
3690     }
3691
3692     frame = sp + 1;
3693     while (stop_here == NULL || frame < stop_here) {
3694
3695         // 1. Let the top of the stack be the "current closure"
3696         //
3697         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3698         // CATCH_FRAME.
3699         //
3700         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3701         // current closure applied to the chunk of stack up to (but not
3702         // including) the update frame.  This closure becomes the "current
3703         // closure".  Go back to step 2.
3704         //
3705         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3706         // top of the stack applied to the exception.
3707         // 
3708         // 5. If it's a STOP_FRAME, then kill the thread.
3709         // 
3710         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3711         // transaction
3712        
3713         info = get_ret_itbl((StgClosure *)frame);
3714
3715         switch (info->i.type) {
3716
3717         case UPDATE_FRAME:
3718         {
3719             StgAP_STACK * ap;
3720             nat words;
3721             
3722             // First build an AP_STACK consisting of the stack chunk above the
3723             // current update frame, with the top word on the stack as the
3724             // fun field.
3725             //
3726             words = frame - sp - 1;
3727             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3728             
3729             ap->size = words;
3730             ap->fun  = (StgClosure *)sp[0];
3731             sp++;
3732             for(i=0; i < (nat)words; ++i) {
3733                 ap->payload[i] = (StgClosure *)*sp++;
3734             }
3735             
3736             SET_HDR(ap,&stg_AP_STACK_info,
3737                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3738             TICK_ALLOC_UP_THK(words+1,0);
3739             
3740             IF_DEBUG(scheduler,
3741                      debugBelch("sched: Updating ");
3742                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3743                      debugBelch(" with ");
3744                      printObj((StgClosure *)ap);
3745                 );
3746
3747             // Replace the updatee with an indirection
3748             //
3749             // Warning: if we're in a loop, more than one update frame on
3750             // the stack may point to the same object.  Be careful not to
3751             // overwrite an IND_OLDGEN in this case, because we'll screw
3752             // up the mutable lists.  To be on the safe side, don't
3753             // overwrite any kind of indirection at all.  See also
3754             // threadSqueezeStack in GC.c, where we have to make a similar
3755             // check.
3756             //
3757             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3758                 // revert the black hole
3759                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3760                                (StgClosure *)ap);
3761             }
3762             sp += sizeofW(StgUpdateFrame) - 1;
3763             sp[0] = (W_)ap; // push onto stack
3764             frame = sp + 1;
3765             continue; //no need to bump frame
3766         }
3767
3768         case STOP_FRAME:
3769             // We've stripped the entire stack, the thread is now dead.
3770             tso->what_next = ThreadKilled;
3771             tso->sp = frame + sizeofW(StgStopFrame);
3772             return;
3773
3774         case CATCH_FRAME:
3775             // If we find a CATCH_FRAME, and we've got an exception to raise,
3776             // then build the THUNK raise(exception), and leave it on
3777             // top of the CATCH_FRAME ready to enter.
3778             //
3779         {
3780 #ifdef PROFILING
3781             StgCatchFrame *cf = (StgCatchFrame *)frame;
3782 #endif
3783             StgThunk *raise;
3784             
3785             if (exception == NULL) break;
3786
3787             // we've got an exception to raise, so let's pass it to the
3788             // handler in this frame.
3789             //
3790             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3791             TICK_ALLOC_SE_THK(1,0);
3792             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3793             raise->payload[0] = exception;
3794             
3795             // throw away the stack from Sp up to the CATCH_FRAME.
3796             //
3797             sp = frame - 1;
3798             
3799             /* Ensure that async excpetions are blocked now, so we don't get
3800              * a surprise exception before we get around to executing the
3801              * handler.
3802              */
3803             if (tso->blocked_exceptions == NULL) {
3804                 tso->blocked_exceptions = END_TSO_QUEUE;
3805             }
3806
3807             /* Put the newly-built THUNK on top of the stack, ready to execute
3808              * when the thread restarts.
3809              */
3810             sp[0] = (W_)raise;
3811             sp[-1] = (W_)&stg_enter_info;
3812             tso->sp = sp-1;
3813             tso->what_next = ThreadRunGHC;
3814             IF_DEBUG(sanity, checkTSO(tso));
3815             return;
3816         }
3817             
3818         case ATOMICALLY_FRAME:
3819             if (stop_at_atomically) {
3820                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3821                 stmCondemnTransaction(cap, tso -> trec);
3822 #ifdef REG_R1
3823                 tso->sp = frame;
3824 #else
3825                 // R1 is not a register: the return convention for IO in
3826                 // this case puts the return value on the stack, so we
3827                 // need to set up the stack to return to the atomically
3828                 // frame properly...
3829                 tso->sp = frame - 2;
3830                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3831                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3832 #endif
3833                 tso->what_next = ThreadRunGHC;
3834                 return;
3835             }
3836             // Not stop_at_atomically... fall through and abort the
3837             // transaction.
3838             
3839         case CATCH_RETRY_FRAME:
3840             // IF we find an ATOMICALLY_FRAME then we abort the
3841             // current transaction and propagate the exception.  In
3842             // this case (unlike ordinary exceptions) we do not care
3843             // whether the transaction is valid or not because its
3844             // possible validity cannot have caused the exception
3845             // and will not be visible after the abort.
3846             IF_DEBUG(stm,
3847                      debugBelch("Found atomically block delivering async exception\n"));
3848             StgTRecHeader *trec = tso -> trec;
3849             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3850             stmAbortTransaction(cap, trec);
3851             tso -> trec = outer;
3852             break;
3853             
3854         default:
3855             break;
3856         }
3857
3858         // move on to the next stack frame
3859         frame += stack_frame_sizeW((StgClosure *)frame);
3860     }
3861
3862     // if we got here, then we stopped at stop_here
3863     ASSERT(stop_here != NULL);
3864 }
3865
3866 /* -----------------------------------------------------------------------------
3867    Deleting threads
3868
3869    This is used for interruption (^C) and forking, and corresponds to
3870    raising an exception but without letting the thread catch the
3871    exception.
3872    -------------------------------------------------------------------------- */
3873
3874 static void 
3875 deleteThread (Capability *cap, StgTSO *tso)
3876 {
3877   if (tso->why_blocked != BlockedOnCCall &&
3878       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3879       raiseAsync(cap,tso,NULL);
3880   }
3881 }
3882
3883 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3884 static void 
3885 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3886 { // for forkProcess only:
3887   // delete thread without giving it a chance to catch the KillThread exception
3888
3889   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3890       return;
3891   }
3892
3893   if (tso->why_blocked != BlockedOnCCall &&
3894       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3895       unblockThread(cap,tso);
3896   }
3897
3898   tso->what_next = ThreadKilled;
3899 }
3900 #endif
3901
3902 /* -----------------------------------------------------------------------------
3903    raiseExceptionHelper
3904    
3905    This function is called by the raise# primitve, just so that we can
3906    move some of the tricky bits of raising an exception from C-- into
3907    C.  Who knows, it might be a useful re-useable thing here too.
3908    -------------------------------------------------------------------------- */
3909
3910 StgWord
3911 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3912 {
3913     Capability *cap = regTableToCapability(reg);
3914     StgThunk *raise_closure = NULL;
3915     StgPtr p, next;
3916     StgRetInfoTable *info;
3917     //
3918     // This closure represents the expression 'raise# E' where E
3919     // is the exception raise.  It is used to overwrite all the
3920     // thunks which are currently under evaluataion.
3921     //
3922
3923     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3924     // LDV profiling: stg_raise_info has THUNK as its closure
3925     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3926     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3927     // 1 does not cause any problem unless profiling is performed.
3928     // However, when LDV profiling goes on, we need to linearly scan
3929     // small object pool, where raise_closure is stored, so we should
3930     // use MIN_UPD_SIZE.
3931     //
3932     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3933     //                                 sizeofW(StgClosure)+1);
3934     //
3935
3936     //
3937     // Walk up the stack, looking for the catch frame.  On the way,
3938     // we update any closures pointed to from update frames with the
3939     // raise closure that we just built.
3940     //
3941     p = tso->sp;
3942     while(1) {
3943         info = get_ret_itbl((StgClosure *)p);
3944         next = p + stack_frame_sizeW((StgClosure *)p);
3945         switch (info->i.type) {
3946             
3947         case UPDATE_FRAME:
3948             // Only create raise_closure if we need to.
3949             if (raise_closure == NULL) {
3950                 raise_closure = 
3951                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3952                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3953                 raise_closure->payload[0] = exception;
3954             }
3955             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3956             p = next;
3957             continue;
3958
3959         case ATOMICALLY_FRAME:
3960             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3961             tso->sp = p;
3962             return ATOMICALLY_FRAME;
3963             
3964         case CATCH_FRAME:
3965             tso->sp = p;
3966             return CATCH_FRAME;
3967
3968         case CATCH_STM_FRAME:
3969             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3970             tso->sp = p;
3971             return CATCH_STM_FRAME;
3972             
3973         case STOP_FRAME:
3974             tso->sp = p;
3975             return STOP_FRAME;
3976
3977         case CATCH_RETRY_FRAME:
3978         default:
3979             p = next; 
3980             continue;
3981         }
3982     }
3983 }
3984
3985
3986 /* -----------------------------------------------------------------------------
3987    findRetryFrameHelper
3988
3989    This function is called by the retry# primitive.  It traverses the stack
3990    leaving tso->sp referring to the frame which should handle the retry.  
3991
3992    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3993    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3994
3995    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3996    despite the similar implementation.
3997
3998    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3999    not be created within memory transactions.
4000    -------------------------------------------------------------------------- */
4001
4002 StgWord
4003 findRetryFrameHelper (StgTSO *tso)
4004 {
4005   StgPtr           p, next;
4006   StgRetInfoTable *info;
4007
4008   p = tso -> sp;
4009   while (1) {
4010     info = get_ret_itbl((StgClosure *)p);
4011     next = p + stack_frame_sizeW((StgClosure *)p);
4012     switch (info->i.type) {
4013       
4014     case ATOMICALLY_FRAME:
4015       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4016       tso->sp = p;
4017       return ATOMICALLY_FRAME;
4018       
4019     case CATCH_RETRY_FRAME:
4020       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4021       tso->sp = p;
4022       return CATCH_RETRY_FRAME;
4023       
4024     case CATCH_STM_FRAME:
4025     default:
4026       ASSERT(info->i.type != CATCH_FRAME);
4027       ASSERT(info->i.type != STOP_FRAME);
4028       p = next; 
4029       continue;
4030     }
4031   }
4032 }
4033
4034 /* -----------------------------------------------------------------------------
4035    resurrectThreads is called after garbage collection on the list of
4036    threads found to be garbage.  Each of these threads will be woken
4037    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4038    on an MVar, or NonTermination if the thread was blocked on a Black
4039    Hole.
4040
4041    Locks: assumes we hold *all* the capabilities.
4042    -------------------------------------------------------------------------- */
4043
4044 void
4045 resurrectThreads (StgTSO *threads)
4046 {
4047     StgTSO *tso, *next;
4048     Capability *cap;
4049
4050     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4051         next = tso->global_link;
4052         tso->global_link = all_threads;
4053         all_threads = tso;
4054         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4055         
4056         // Wake up the thread on the Capability it was last on for a
4057         // bound thread, or last_free_capability otherwise.
4058         if (tso->bound) {
4059             cap = tso->bound->cap;
4060         } else {
4061             cap = last_free_capability;
4062         }
4063         
4064         switch (tso->why_blocked) {
4065         case BlockedOnMVar:
4066         case BlockedOnException:
4067             /* Called by GC - sched_mutex lock is currently held. */
4068             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4069             break;
4070         case BlockedOnBlackHole:
4071             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4072             break;
4073         case BlockedOnSTM:
4074             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4075             break;
4076         case NotBlocked:
4077             /* This might happen if the thread was blocked on a black hole
4078              * belonging to a thread that we've just woken up (raiseAsync
4079              * can wake up threads, remember...).
4080              */
4081             continue;
4082         default:
4083             barf("resurrectThreads: thread blocked in a strange way");
4084         }
4085     }
4086 }
4087
4088 /* ----------------------------------------------------------------------------
4089  * Debugging: why is a thread blocked
4090  * [Also provides useful information when debugging threaded programs
4091  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4092    ------------------------------------------------------------------------- */
4093
4094 #if DEBUG
4095 static void
4096 printThreadBlockage(StgTSO *tso)
4097 {
4098   switch (tso->why_blocked) {
4099   case BlockedOnRead:
4100     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4101     break;
4102   case BlockedOnWrite:
4103     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4104     break;
4105 #if defined(mingw32_HOST_OS)
4106     case BlockedOnDoProc:
4107     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4108     break;
4109 #endif
4110   case BlockedOnDelay:
4111     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4112     break;
4113   case BlockedOnMVar:
4114     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4115     break;
4116   case BlockedOnException:
4117     debugBelch("is blocked on delivering an exception to thread %d",
4118             tso->block_info.tso->id);
4119     break;
4120   case BlockedOnBlackHole:
4121     debugBelch("is blocked on a black hole");
4122     break;
4123   case NotBlocked:
4124     debugBelch("is not blocked");
4125     break;
4126 #if defined(PARALLEL_HASKELL)
4127   case BlockedOnGA:
4128     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4129             tso->block_info.closure, info_type(tso->block_info.closure));
4130     break;
4131   case BlockedOnGA_NoSend:
4132     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4133             tso->block_info.closure, info_type(tso->block_info.closure));
4134     break;
4135 #endif
4136   case BlockedOnCCall:
4137     debugBelch("is blocked on an external call");
4138     break;
4139   case BlockedOnCCall_NoUnblockExc:
4140     debugBelch("is blocked on an external call (exceptions were already blocked)");
4141     break;
4142   case BlockedOnSTM:
4143     debugBelch("is blocked on an STM operation");
4144     break;
4145   default:
4146     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4147          tso->why_blocked, tso->id, tso);
4148   }
4149 }
4150
4151 void
4152 printThreadStatus(StgTSO *t)
4153 {
4154     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4155     {
4156       void *label = lookupThreadLabel(t->id);
4157       if (label) debugBelch("[\"%s\"] ",(char *)label);
4158     }
4159     if (t->what_next == ThreadRelocated) {
4160         debugBelch("has been relocated...\n");
4161     } else {
4162         switch (t->what_next) {
4163         case ThreadKilled:
4164             debugBelch("has been killed");
4165             break;
4166         case ThreadComplete:
4167             debugBelch("has completed");
4168             break;
4169         default:
4170             printThreadBlockage(t);
4171         }
4172         debugBelch("\n");
4173     }
4174 }
4175
4176 void
4177 printAllThreads(void)
4178 {
4179   StgTSO *t, *next;
4180   nat i;
4181   Capability *cap;
4182
4183 # if defined(GRAN)
4184   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4185   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4186                        time_string, rtsFalse/*no commas!*/);
4187
4188   debugBelch("all threads at [%s]:\n", time_string);
4189 # elif defined(PARALLEL_HASKELL)
4190   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4191   ullong_format_string(CURRENT_TIME,
4192                        time_string, rtsFalse/*no commas!*/);
4193
4194   debugBelch("all threads at [%s]:\n", time_string);
4195 # else
4196   debugBelch("all threads:\n");
4197 # endif
4198
4199   for (i = 0; i < n_capabilities; i++) {
4200       cap = &capabilities[i];
4201       debugBelch("threads on capability %d:\n", cap->no);
4202       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4203           printThreadStatus(t);
4204       }
4205   }
4206
4207   debugBelch("other threads:\n");
4208   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4209       if (t->why_blocked != NotBlocked) {
4210           printThreadStatus(t);
4211       }
4212       if (t->what_next == ThreadRelocated) {
4213           next = t->link;
4214       } else {
4215           next = t->global_link;
4216       }
4217   }
4218 }
4219
4220 // useful from gdb
4221 void 
4222 printThreadQueue(StgTSO *t)
4223 {
4224     nat i = 0;
4225     for (; t != END_TSO_QUEUE; t = t->link) {
4226         printThreadStatus(t);
4227         i++;
4228     }
4229     debugBelch("%d threads on queue\n", i);
4230 }
4231
4232 /* 
4233    Print a whole blocking queue attached to node (debugging only).
4234 */
4235 # if defined(PARALLEL_HASKELL)
4236 void 
4237 print_bq (StgClosure *node)
4238 {
4239   StgBlockingQueueElement *bqe;
4240   StgTSO *tso;
4241   rtsBool end;
4242
4243   debugBelch("## BQ of closure %p (%s): ",
4244           node, info_type(node));
4245
4246   /* should cover all closures that may have a blocking queue */
4247   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4248          get_itbl(node)->type == FETCH_ME_BQ ||
4249          get_itbl(node)->type == RBH ||
4250          get_itbl(node)->type == MVAR);
4251     
4252   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4253
4254   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4255 }
4256
4257 /* 
4258    Print a whole blocking queue starting with the element bqe.
4259 */
4260 void 
4261 print_bqe (StgBlockingQueueElement *bqe)
4262 {
4263   rtsBool end;
4264
4265   /* 
4266      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4267   */
4268   for (end = (bqe==END_BQ_QUEUE);
4269        !end; // iterate until bqe points to a CONSTR
4270        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4271        bqe = end ? END_BQ_QUEUE : bqe->link) {
4272     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4273     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4274     /* types of closures that may appear in a blocking queue */
4275     ASSERT(get_itbl(bqe)->type == TSO ||           
4276            get_itbl(bqe)->type == BLOCKED_FETCH || 
4277            get_itbl(bqe)->type == CONSTR); 
4278     /* only BQs of an RBH end with an RBH_Save closure */
4279     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4280
4281     switch (get_itbl(bqe)->type) {
4282     case TSO:
4283       debugBelch(" TSO %u (%x),",
4284               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4285       break;
4286     case BLOCKED_FETCH:
4287       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4288               ((StgBlockedFetch *)bqe)->node, 
4289               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4290               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4291               ((StgBlockedFetch *)bqe)->ga.weight);
4292       break;
4293     case CONSTR:
4294       debugBelch(" %s (IP %p),",
4295               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4296                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4297                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4298                "RBH_Save_?"), get_itbl(bqe));
4299       break;
4300     default:
4301       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4302            info_type((StgClosure *)bqe)); // , node, info_type(node));
4303       break;
4304     }
4305   } /* for */
4306   debugBelch("\n");
4307 }
4308 # elif defined(GRAN)
4309 void 
4310 print_bq (StgClosure *node)
4311 {
4312   StgBlockingQueueElement *bqe;
4313   PEs node_loc, tso_loc;
4314   rtsBool end;
4315
4316   /* should cover all closures that may have a blocking queue */
4317   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4318          get_itbl(node)->type == FETCH_ME_BQ ||
4319          get_itbl(node)->type == RBH);
4320     
4321   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4322   node_loc = where_is(node);
4323
4324   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4325           node, info_type(node), node_loc);
4326
4327   /* 
4328      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4329   */
4330   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4331        !end; // iterate until bqe points to a CONSTR
4332        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4333     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4334     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4335     /* types of closures that may appear in a blocking queue */
4336     ASSERT(get_itbl(bqe)->type == TSO ||           
4337            get_itbl(bqe)->type == CONSTR); 
4338     /* only BQs of an RBH end with an RBH_Save closure */
4339     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4340
4341     tso_loc = where_is((StgClosure *)bqe);
4342     switch (get_itbl(bqe)->type) {
4343     case TSO:
4344       debugBelch(" TSO %d (%p) on [PE %d],",
4345               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4346       break;
4347     case CONSTR:
4348       debugBelch(" %s (IP %p),",
4349               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4350                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4351                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4352                "RBH_Save_?"), get_itbl(bqe));
4353       break;
4354     default:
4355       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4356            info_type((StgClosure *)bqe), node, info_type(node));
4357       break;
4358     }
4359   } /* for */
4360   debugBelch("\n");
4361 }
4362 # endif
4363
4364 #if defined(PARALLEL_HASKELL)
4365 static nat
4366 run_queue_len(void)
4367 {
4368     nat i;
4369     StgTSO *tso;
4370     
4371     for (i=0, tso=run_queue_hd; 
4372          tso != END_TSO_QUEUE;
4373          i++, tso=tso->link) {
4374         /* nothing */
4375     }
4376         
4377     return i;
4378 }
4379 #endif
4380
4381 void
4382 sched_belch(char *s, ...)
4383 {
4384     va_list ap;
4385     va_start(ap,s);
4386 #ifdef THREADED_RTS
4387     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4388 #elif defined(PARALLEL_HASKELL)
4389     debugBelch("== ");
4390 #else
4391     debugBelch("sched: ");
4392 #endif
4393     vdebugBelch(s, ap);
4394     debugBelch("\n");
4395     va_end(ap);
4396 }
4397
4398 #endif /* DEBUG */