extra sanity checking: call checkTSO() in resumeThread()
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool interrupted = rtsFalse;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
231                          void (*get_roots)(evac_fn));
232
233 static void unblockThread(Capability *cap, StgTSO *tso);
234 static rtsBool checkBlackHoles(Capability *cap);
235 static void AllRoots(evac_fn evac);
236
237 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238
239 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
240                         rtsBool stop_at_atomically, StgPtr stop_here);
241
242 static void deleteThread (Capability *cap, StgTSO *tso);
243 static void deleteRunQueue (Capability *cap);
244
245 #ifdef DEBUG
246 static void printThreadBlockage(StgTSO *tso);
247 static void printThreadStatus(StgTSO *tso);
248 void printThreadQueue(StgTSO *tso);
249 #endif
250
251 #if defined(PARALLEL_HASKELL)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 #ifdef DEBUG
257 static char *whatNext_strs[] = {
258   "(unknown)",
259   "ThreadRunGHC",
260   "ThreadInterpret",
261   "ThreadKilled",
262   "ThreadRelocated",
263   "ThreadComplete"
264 };
265 #endif
266
267 /* -----------------------------------------------------------------------------
268  * Putting a thread on the run queue: different scheduling policies
269  * -------------------------------------------------------------------------- */
270
271 STATIC_INLINE void
272 addToRunQueue( Capability *cap, StgTSO *t )
273 {
274 #if defined(PARALLEL_HASKELL)
275     if (RtsFlags.ParFlags.doFairScheduling) { 
276         // this does round-robin scheduling; good for concurrency
277         appendToRunQueue(cap,t);
278     } else {
279         // this does unfair scheduling; good for parallelism
280         pushOnRunQueue(cap,t);
281     }
282 #else
283     // this does round-robin scheduling; good for concurrency
284     appendToRunQueue(cap,t);
285 #endif
286 }
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    GRAN version:
301      In a GranSim setup this loop iterates over the global event queue.
302      This revolves around the global event queue, which determines what 
303      to do next. Therefore, it's more complicated than either the 
304      concurrent or the parallel (GUM) setup.
305
306    GUM version:
307      GUM iterates over incoming messages.
308      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
309      and sends out a fish whenever it has nothing to do; in-between
310      doing the actual reductions (shared code below) it processes the
311      incoming messages and deals with delayed operations 
312      (see PendingFetches).
313      This is not the ugliest code you could imagine, but it's bloody close.
314
315    ------------------------------------------------------------------------ */
316
317 static Capability *
318 schedule (Capability *initialCapability, Task *task)
319 {
320   StgTSO *t;
321   Capability *cap;
322   StgThreadReturnCode ret;
323 #if defined(GRAN)
324   rtsEvent *event;
325 #elif defined(PARALLEL_HASKELL)
326   StgTSO *tso;
327   GlobalTaskId pe;
328   rtsBool receivedFinish = rtsFalse;
329 # if defined(DEBUG)
330   nat tp_size, sp_size; // stats only
331 # endif
332 #endif
333   nat prev_what_next;
334   rtsBool ready_to_gc;
335 #if defined(THREADED_RTS)
336   rtsBool first = rtsTrue;
337 #endif
338   
339   cap = initialCapability;
340
341   // Pre-condition: this task owns initialCapability.
342   // The sched_mutex is *NOT* held
343   // NB. on return, we still hold a capability.
344
345   IF_DEBUG(scheduler,
346            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
347                        task, initialCapability);
348       );
349
350   schedulePreLoop();
351
352   // -----------------------------------------------------------
353   // Scheduler loop starts here:
354
355 #if defined(PARALLEL_HASKELL)
356 #define TERMINATION_CONDITION        (!receivedFinish)
357 #elif defined(GRAN)
358 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
359 #else
360 #define TERMINATION_CONDITION        rtsTrue
361 #endif
362
363   while (TERMINATION_CONDITION) {
364
365 #if defined(GRAN)
366       /* Choose the processor with the next event */
367       CurrentProc = event->proc;
368       CurrentTSO = event->tso;
369 #endif
370
371 #if defined(THREADED_RTS)
372       if (first) {
373           // don't yield the first time, we want a chance to run this
374           // thread for a bit, even if there are others banging at the
375           // door.
376           first = rtsFalse;
377           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378       } else {
379           // Yield the capability to higher-priority tasks if necessary.
380           yieldCapability(&cap, task);
381       }
382 #endif
383       
384 #if defined(THREADED_RTS)
385       schedulePushWork(cap,task);
386 #endif
387
388     // Check whether we have re-entered the RTS from Haskell without
389     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390     // call).
391     if (cap->in_haskell) {
392           errorBelch("schedule: re-entered unsafely.\n"
393                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
394           stg_exit(EXIT_FAILURE);
395     }
396
397     //
398     // Test for interruption.  If interrupted==rtsTrue, then either
399     // we received a keyboard interrupt (^C), or the scheduler is
400     // trying to shut down all the tasks (shutting_down_scheduler) in
401     // the threaded RTS.
402     //
403     if (interrupted) {
404         deleteRunQueue(cap);
405 #if defined(THREADED_RTS)
406         discardSparksCap(cap);
407 #endif
408         if (shutting_down_scheduler) {
409             IF_DEBUG(scheduler, sched_belch("shutting down"));
410             // If we are a worker, just exit.  If we're a bound thread
411             // then we will exit below when we've removed our TSO from
412             // the run queue.
413             if (task->tso == NULL && emptyRunQueue(cap)) {
414                 return cap;
415             }
416         } else {
417             IF_DEBUG(scheduler, sched_belch("interrupted"));
418         }
419     }
420
421 #if defined(THREADED_RTS)
422     // If the run queue is empty, take a spark and turn it into a thread.
423     {
424         if (emptyRunQueue(cap)) {
425             StgClosure *spark;
426             spark = findSpark(cap);
427             if (spark != NULL) {
428                 IF_DEBUG(scheduler,
429                          sched_belch("turning spark of closure %p into a thread",
430                                      (StgClosure *)spark));
431                 createSparkThread(cap,spark);     
432             }
433         }
434     }
435 #endif // THREADED_RTS
436
437     scheduleStartSignalHandlers(cap);
438
439     // Only check the black holes here if we've nothing else to do.
440     // During normal execution, the black hole list only gets checked
441     // at GC time, to avoid repeatedly traversing this possibly long
442     // list each time around the scheduler.
443     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444
445     scheduleCheckBlockedThreads(cap);
446
447     scheduleDetectDeadlock(cap,task);
448 #if defined(THREADED_RTS)
449     cap = task->cap;    // reload cap, it might have changed
450 #endif
451
452     // Normally, the only way we can get here with no threads to
453     // run is if a keyboard interrupt received during 
454     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
455     // Additionally, it is not fatal for the
456     // threaded RTS to reach here with no threads to run.
457     //
458     // win32: might be here due to awaitEvent() being abandoned
459     // as a result of a console event having been delivered.
460     if ( emptyRunQueue(cap) ) {
461 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
462         ASSERT(interrupted);
463 #endif
464         continue; // nothing to do
465     }
466
467 #if defined(PARALLEL_HASKELL)
468     scheduleSendPendingMessages();
469     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
470         continue;
471
472 #if defined(SPARKS)
473     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
474 #endif
475
476     /* If we still have no work we need to send a FISH to get a spark
477        from another PE */
478     if (emptyRunQueue(cap)) {
479         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
480         ASSERT(rtsFalse); // should not happen at the moment
481     }
482     // from here: non-empty run queue.
483     //  TODO: merge above case with this, only one call processMessages() !
484     if (PacketsWaiting()) {  /* process incoming messages, if
485                                 any pending...  only in else
486                                 because getRemoteWork waits for
487                                 messages as well */
488         receivedFinish = processMessages();
489     }
490 #endif
491
492 #if defined(GRAN)
493     scheduleProcessEvent(event);
494 #endif
495
496     // 
497     // Get a thread to run
498     //
499     t = popRunQueue(cap);
500
501 #if defined(GRAN) || defined(PAR)
502     scheduleGranParReport(); // some kind of debuging output
503 #else
504     // Sanity check the thread we're about to run.  This can be
505     // expensive if there is lots of thread switching going on...
506     IF_DEBUG(sanity,checkTSO(t));
507 #endif
508
509 #if defined(THREADED_RTS)
510     // Check whether we can run this thread in the current task.
511     // If not, we have to pass our capability to the right task.
512     {
513         Task *bound = t->bound;
514       
515         if (bound) {
516             if (bound == task) {
517                 IF_DEBUG(scheduler,
518                          sched_belch("### Running thread %d in bound thread",
519                                      t->id));
520                 // yes, the Haskell thread is bound to the current native thread
521             } else {
522                 IF_DEBUG(scheduler,
523                          sched_belch("### thread %d bound to another OS thread",
524                                      t->id));
525                 // no, bound to a different Haskell thread: pass to that thread
526                 pushOnRunQueue(cap,t);
527                 continue;
528             }
529         } else {
530             // The thread we want to run is unbound.
531             if (task->tso) { 
532                 IF_DEBUG(scheduler,
533                          sched_belch("### this OS thread cannot run thread %d", t->id));
534                 // no, the current native thread is bound to a different
535                 // Haskell thread, so pass it to any worker thread
536                 pushOnRunQueue(cap,t);
537                 continue; 
538             }
539         }
540     }
541 #endif
542
543     cap->r.rCurrentTSO = t;
544     
545     /* context switches are initiated by the timer signal, unless
546      * the user specified "context switch as often as possible", with
547      * +RTS -C0
548      */
549     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
550         && !emptyThreadQueues(cap)) {
551         context_switch = 1;
552     }
553          
554 run_thread:
555
556     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
557                               (long)t->id, whatNext_strs[t->what_next]));
558
559 #if defined(PROFILING)
560     startHeapProfTimer();
561 #endif
562
563     // ----------------------------------------------------------------------
564     // Run the current thread 
565
566     prev_what_next = t->what_next;
567
568     errno = t->saved_errno;
569     cap->in_haskell = rtsTrue;
570
571     dirtyTSO(t);
572
573     recent_activity = ACTIVITY_YES;
574
575     switch (prev_what_next) {
576         
577     case ThreadKilled:
578     case ThreadComplete:
579         /* Thread already finished, return to scheduler. */
580         ret = ThreadFinished;
581         break;
582         
583     case ThreadRunGHC:
584     {
585         StgRegTable *r;
586         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
587         cap = regTableToCapability(r);
588         ret = r->rRet;
589         break;
590     }
591     
592     case ThreadInterpret:
593         cap = interpretBCO(cap);
594         ret = cap->r.rRet;
595         break;
596         
597     default:
598         barf("schedule: invalid what_next field");
599     }
600
601     cap->in_haskell = rtsFalse;
602
603     // The TSO might have moved, eg. if it re-entered the RTS and a GC
604     // happened.  So find the new location:
605     t = cap->r.rCurrentTSO;
606
607     // We have run some Haskell code: there might be blackhole-blocked
608     // threads to wake up now.
609     // Lock-free test here should be ok, we're just setting a flag.
610     if ( blackhole_queue != END_TSO_QUEUE ) {
611         blackholes_need_checking = rtsTrue;
612     }
613     
614     // And save the current errno in this thread.
615     // XXX: possibly bogus for SMP because this thread might already
616     // be running again, see code below.
617     t->saved_errno = errno;
618
619 #if defined(THREADED_RTS)
620     // If ret is ThreadBlocked, and this Task is bound to the TSO that
621     // blocked, we are in limbo - the TSO is now owned by whatever it
622     // is blocked on, and may in fact already have been woken up,
623     // perhaps even on a different Capability.  It may be the case
624     // that task->cap != cap.  We better yield this Capability
625     // immediately and return to normaility.
626     if (ret == ThreadBlocked) {
627         IF_DEBUG(scheduler,
628                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
629                              t->id, whatNext_strs[t->what_next]));
630         continue;
631     }
632 #endif
633
634     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
635
636     // ----------------------------------------------------------------------
637     
638     // Costs for the scheduler are assigned to CCS_SYSTEM
639 #if defined(PROFILING)
640     stopHeapProfTimer();
641     CCCS = CCS_SYSTEM;
642 #endif
643     
644 #if defined(THREADED_RTS)
645     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
646 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
647     IF_DEBUG(scheduler,debugBelch("sched: "););
648 #endif
649     
650     schedulePostRunThread();
651
652     ready_to_gc = rtsFalse;
653
654     switch (ret) {
655     case HeapOverflow:
656         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
657         break;
658
659     case StackOverflow:
660         scheduleHandleStackOverflow(cap,task,t);
661         break;
662
663     case ThreadYielding:
664         if (scheduleHandleYield(cap, t, prev_what_next)) {
665             // shortcut for switching between compiler/interpreter:
666             goto run_thread; 
667         }
668         break;
669
670     case ThreadBlocked:
671         scheduleHandleThreadBlocked(t);
672         break;
673
674     case ThreadFinished:
675         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
676         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
677         break;
678
679     default:
680       barf("schedule: invalid thread return code %d", (int)ret);
681     }
682
683     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
684     if (ready_to_gc) {
685       scheduleDoGC(cap,task,rtsFalse,GetRoots);
686 #if defined(THREADED_RTS)
687       cap = task->cap;    // reload cap, it might have changed  
688 #endif
689     }
690   } /* end of while() */
691
692   IF_PAR_DEBUG(verbose,
693                debugBelch("== Leaving schedule() after having received Finish\n"));
694 }
695
696 /* ----------------------------------------------------------------------------
697  * Setting up the scheduler loop
698  * ------------------------------------------------------------------------- */
699
700 static void
701 schedulePreLoop(void)
702 {
703 #if defined(GRAN) 
704     /* set up first event to get things going */
705     /* ToDo: assign costs for system setup and init MainTSO ! */
706     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
707               ContinueThread, 
708               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
709     
710     IF_DEBUG(gran,
711              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
712                         CurrentTSO);
713              G_TSO(CurrentTSO, 5));
714     
715     if (RtsFlags.GranFlags.Light) {
716         /* Save current time; GranSim Light only */
717         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
718     }      
719 #endif
720 }
721
722 /* -----------------------------------------------------------------------------
723  * schedulePushWork()
724  *
725  * Push work to other Capabilities if we have some.
726  * -------------------------------------------------------------------------- */
727
728 #if defined(THREADED_RTS)
729 static void
730 schedulePushWork(Capability *cap USED_IF_THREADS, 
731                  Task *task      USED_IF_THREADS)
732 {
733     Capability *free_caps[n_capabilities], *cap0;
734     nat i, n_free_caps;
735
736     // Check whether we have more threads on our run queue, or sparks
737     // in our pool, that we could hand to another Capability.
738     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
739         && sparkPoolSizeCap(cap) < 2) {
740         return;
741     }
742
743     // First grab as many free Capabilities as we can.
744     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
745         cap0 = &capabilities[i];
746         if (cap != cap0 && tryGrabCapability(cap0,task)) {
747             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
748                 // it already has some work, we just grabbed it at 
749                 // the wrong moment.  Or maybe it's deadlocked!
750                 releaseCapability(cap0);
751             } else {
752                 free_caps[n_free_caps++] = cap0;
753             }
754         }
755     }
756
757     // we now have n_free_caps free capabilities stashed in
758     // free_caps[].  Share our run queue equally with them.  This is
759     // probably the simplest thing we could do; improvements we might
760     // want to do include:
761     //
762     //   - giving high priority to moving relatively new threads, on 
763     //     the gournds that they haven't had time to build up a
764     //     working set in the cache on this CPU/Capability.
765     //
766     //   - giving low priority to moving long-lived threads
767
768     if (n_free_caps > 0) {
769         StgTSO *prev, *t, *next;
770         rtsBool pushed_to_all;
771
772         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
773
774         i = 0;
775         pushed_to_all = rtsFalse;
776
777         if (cap->run_queue_hd != END_TSO_QUEUE) {
778             prev = cap->run_queue_hd;
779             t = prev->link;
780             prev->link = END_TSO_QUEUE;
781             for (; t != END_TSO_QUEUE; t = next) {
782                 next = t->link;
783                 t->link = END_TSO_QUEUE;
784                 if (t->what_next == ThreadRelocated
785                     || t->bound == task) { // don't move my bound thread
786                     prev->link = t;
787                     prev = t;
788                 } else if (i == n_free_caps) {
789                     pushed_to_all = rtsTrue;
790                     i = 0;
791                     // keep one for us
792                     prev->link = t;
793                     prev = t;
794                 } else {
795                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
796                     appendToRunQueue(free_caps[i],t);
797                     if (t->bound) { t->bound->cap = free_caps[i]; }
798                     i++;
799                 }
800             }
801             cap->run_queue_tl = prev;
802         }
803
804         // If there are some free capabilities that we didn't push any
805         // threads to, then try to push a spark to each one.
806         if (!pushed_to_all) {
807             StgClosure *spark;
808             // i is the next free capability to push to
809             for (; i < n_free_caps; i++) {
810                 if (emptySparkPoolCap(free_caps[i])) {
811                     spark = findSpark(cap);
812                     if (spark != NULL) {
813                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
814                         newSpark(&(free_caps[i]->r), spark);
815                     }
816                 }
817             }
818         }
819
820         // release the capabilities
821         for (i = 0; i < n_free_caps; i++) {
822             task->cap = free_caps[i];
823             releaseCapability(free_caps[i]);
824         }
825     }
826     task->cap = cap; // reset to point to our Capability.
827 }
828 #endif
829
830 /* ----------------------------------------------------------------------------
831  * Start any pending signal handlers
832  * ------------------------------------------------------------------------- */
833
834 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
835 static void
836 scheduleStartSignalHandlers(Capability *cap)
837 {
838     if (signals_pending()) { // safe outside the lock
839         startSignalHandlers(cap);
840     }
841 }
842 #else
843 static void
844 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
845 {
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Check for blocked threads that can be woken up.
851  * ------------------------------------------------------------------------- */
852
853 static void
854 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
855 {
856 #if !defined(THREADED_RTS)
857     //
858     // Check whether any waiting threads need to be woken up.  If the
859     // run queue is empty, and there are no other tasks running, we
860     // can wait indefinitely for something to happen.
861     //
862     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
863     {
864         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
865     }
866 #endif
867 }
868
869
870 /* ----------------------------------------------------------------------------
871  * Check for threads blocked on BLACKHOLEs that can be woken up
872  * ------------------------------------------------------------------------- */
873 static void
874 scheduleCheckBlackHoles (Capability *cap)
875 {
876     if ( blackholes_need_checking ) // check without the lock first
877     {
878         ACQUIRE_LOCK(&sched_mutex);
879         if ( blackholes_need_checking ) {
880             checkBlackHoles(cap);
881             blackholes_need_checking = rtsFalse;
882         }
883         RELEASE_LOCK(&sched_mutex);
884     }
885 }
886
887 /* ----------------------------------------------------------------------------
888  * Detect deadlock conditions and attempt to resolve them.
889  * ------------------------------------------------------------------------- */
890
891 static void
892 scheduleDetectDeadlock (Capability *cap, Task *task)
893 {
894
895 #if defined(PARALLEL_HASKELL)
896     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
897     return;
898 #endif
899
900     /* 
901      * Detect deadlock: when we have no threads to run, there are no
902      * threads blocked, waiting for I/O, or sleeping, and all the
903      * other tasks are waiting for work, we must have a deadlock of
904      * some description.
905      */
906     if ( emptyThreadQueues(cap) )
907     {
908 #if defined(THREADED_RTS)
909         /* 
910          * In the threaded RTS, we only check for deadlock if there
911          * has been no activity in a complete timeslice.  This means
912          * we won't eagerly start a full GC just because we don't have
913          * any threads to run currently.
914          */
915         if (recent_activity != ACTIVITY_INACTIVE) return;
916 #endif
917
918         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
919
920         // Garbage collection can release some new threads due to
921         // either (a) finalizers or (b) threads resurrected because
922         // they are unreachable and will therefore be sent an
923         // exception.  Any threads thus released will be immediately
924         // runnable.
925         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
926 #if defined(THREADED_RTS)
927         cap = task->cap;    // reload cap, it might have changed
928 #endif
929
930         recent_activity = ACTIVITY_DONE_GC;
931         
932         if ( !emptyRunQueue(cap) ) return;
933
934 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
935         /* If we have user-installed signal handlers, then wait
936          * for signals to arrive rather then bombing out with a
937          * deadlock.
938          */
939         if ( anyUserHandlers() ) {
940             IF_DEBUG(scheduler, 
941                      sched_belch("still deadlocked, waiting for signals..."));
942
943             awaitUserSignals();
944
945             if (signals_pending()) {
946                 startSignalHandlers(cap);
947             }
948
949             // either we have threads to run, or we were interrupted:
950             ASSERT(!emptyRunQueue(cap) || interrupted);
951         }
952 #endif
953
954 #if !defined(THREADED_RTS)
955         /* Probably a real deadlock.  Send the current main thread the
956          * Deadlock exception.
957          */
958         if (task->tso) {
959             switch (task->tso->why_blocked) {
960             case BlockedOnSTM:
961             case BlockedOnBlackHole:
962             case BlockedOnException:
963             case BlockedOnMVar:
964                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
965                 return;
966             default:
967                 barf("deadlock: main thread blocked in a strange way");
968             }
969         }
970         return;
971 #endif
972     }
973 }
974
975 /* ----------------------------------------------------------------------------
976  * Process an event (GRAN only)
977  * ------------------------------------------------------------------------- */
978
979 #if defined(GRAN)
980 static StgTSO *
981 scheduleProcessEvent(rtsEvent *event)
982 {
983     StgTSO *t;
984
985     if (RtsFlags.GranFlags.Light)
986       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
987
988     /* adjust time based on time-stamp */
989     if (event->time > CurrentTime[CurrentProc] &&
990         event->evttype != ContinueThread)
991       CurrentTime[CurrentProc] = event->time;
992     
993     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
994     if (!RtsFlags.GranFlags.Light)
995       handleIdlePEs();
996
997     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
998
999     /* main event dispatcher in GranSim */
1000     switch (event->evttype) {
1001       /* Should just be continuing execution */
1002     case ContinueThread:
1003       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1004       /* ToDo: check assertion
1005       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1006              run_queue_hd != END_TSO_QUEUE);
1007       */
1008       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1009       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1010           procStatus[CurrentProc]==Fetching) {
1011         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1012               CurrentTSO->id, CurrentTSO, CurrentProc);
1013         goto next_thread;
1014       } 
1015       /* Ignore ContinueThreads for completed threads */
1016       if (CurrentTSO->what_next == ThreadComplete) {
1017         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1018               CurrentTSO->id, CurrentTSO, CurrentProc);
1019         goto next_thread;
1020       } 
1021       /* Ignore ContinueThreads for threads that are being migrated */
1022       if (PROCS(CurrentTSO)==Nowhere) { 
1023         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1024               CurrentTSO->id, CurrentTSO, CurrentProc);
1025         goto next_thread;
1026       }
1027       /* The thread should be at the beginning of the run queue */
1028       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1029         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1030               CurrentTSO->id, CurrentTSO, CurrentProc);
1031         break; // run the thread anyway
1032       }
1033       /*
1034       new_event(proc, proc, CurrentTime[proc],
1035                 FindWork,
1036                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1037       goto next_thread; 
1038       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1039       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1040
1041     case FetchNode:
1042       do_the_fetchnode(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case GlobalBlock:
1046       do_the_globalblock(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case FetchReply:
1050       do_the_fetchreply(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     case UnblockThread:   /* Move from the blocked queue to the tail of */
1054       do_the_unblock(event);
1055       goto next_thread;             /* handle next event in event queue  */
1056       
1057     case ResumeThread:  /* Move from the blocked queue to the tail of */
1058       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1059       event->tso->gran.blocktime += 
1060         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1061       do_the_startthread(event);
1062       goto next_thread;             /* handle next event in event queue  */
1063       
1064     case StartThread:
1065       do_the_startthread(event);
1066       goto next_thread;             /* handle next event in event queue  */
1067       
1068     case MoveThread:
1069       do_the_movethread(event);
1070       goto next_thread;             /* handle next event in event queue  */
1071       
1072     case MoveSpark:
1073       do_the_movespark(event);
1074       goto next_thread;             /* handle next event in event queue  */
1075       
1076     case FindWork:
1077       do_the_findwork(event);
1078       goto next_thread;             /* handle next event in event queue  */
1079       
1080     default:
1081       barf("Illegal event type %u\n", event->evttype);
1082     }  /* switch */
1083     
1084     /* This point was scheduler_loop in the old RTS */
1085
1086     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1087
1088     TimeOfLastEvent = CurrentTime[CurrentProc];
1089     TimeOfNextEvent = get_time_of_next_event();
1090     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1091     // CurrentTSO = ThreadQueueHd;
1092
1093     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1094                          TimeOfNextEvent));
1095
1096     if (RtsFlags.GranFlags.Light) 
1097       GranSimLight_leave_system(event, &ActiveTSO); 
1098
1099     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1100
1101     IF_DEBUG(gran, 
1102              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1103
1104     /* in a GranSim setup the TSO stays on the run queue */
1105     t = CurrentTSO;
1106     /* Take a thread from the run queue. */
1107     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1108
1109     IF_DEBUG(gran, 
1110              debugBelch("GRAN: About to run current thread, which is\n");
1111              G_TSO(t,5));
1112
1113     context_switch = 0; // turned on via GranYield, checking events and time slice
1114
1115     IF_DEBUG(gran, 
1116              DumpGranEvent(GR_SCHEDULE, t));
1117
1118     procStatus[CurrentProc] = Busy;
1119 }
1120 #endif // GRAN
1121
1122 /* ----------------------------------------------------------------------------
1123  * Send pending messages (PARALLEL_HASKELL only)
1124  * ------------------------------------------------------------------------- */
1125
1126 #if defined(PARALLEL_HASKELL)
1127 static StgTSO *
1128 scheduleSendPendingMessages(void)
1129 {
1130     StgSparkPool *pool;
1131     rtsSpark spark;
1132     StgTSO *t;
1133
1134 # if defined(PAR) // global Mem.Mgmt., omit for now
1135     if (PendingFetches != END_BF_QUEUE) {
1136         processFetches();
1137     }
1138 # endif
1139     
1140     if (RtsFlags.ParFlags.BufferTime) {
1141         // if we use message buffering, we must send away all message
1142         // packets which have become too old...
1143         sendOldBuffers(); 
1144     }
1145 }
1146 #endif
1147
1148 /* ----------------------------------------------------------------------------
1149  * Activate spark threads (PARALLEL_HASKELL only)
1150  * ------------------------------------------------------------------------- */
1151
1152 #if defined(PARALLEL_HASKELL)
1153 static void
1154 scheduleActivateSpark(void)
1155 {
1156 #if defined(SPARKS)
1157   ASSERT(emptyRunQueue());
1158 /* We get here if the run queue is empty and want some work.
1159    We try to turn a spark into a thread, and add it to the run queue,
1160    from where it will be picked up in the next iteration of the scheduler
1161    loop.
1162 */
1163
1164       /* :-[  no local threads => look out for local sparks */
1165       /* the spark pool for the current PE */
1166       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1167       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1168           pool->hd < pool->tl) {
1169         /* 
1170          * ToDo: add GC code check that we really have enough heap afterwards!!
1171          * Old comment:
1172          * If we're here (no runnable threads) and we have pending
1173          * sparks, we must have a space problem.  Get enough space
1174          * to turn one of those pending sparks into a
1175          * thread... 
1176          */
1177
1178         spark = findSpark(rtsFalse);            /* get a spark */
1179         if (spark != (rtsSpark) NULL) {
1180           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1181           IF_PAR_DEBUG(fish, // schedule,
1182                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1183                              tso->id, tso, advisory_thread_count));
1184
1185           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1186             IF_PAR_DEBUG(fish, // schedule,
1187                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1188                             spark));
1189             return rtsFalse; /* failed to generate a thread */
1190           }                  /* otherwise fall through & pick-up new tso */
1191         } else {
1192           IF_PAR_DEBUG(fish, // schedule,
1193                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1194                              spark_queue_len(pool)));
1195           return rtsFalse;  /* failed to generate a thread */
1196         }
1197         return rtsTrue;  /* success in generating a thread */
1198   } else { /* no more threads permitted or pool empty */
1199     return rtsFalse;  /* failed to generateThread */
1200   }
1201 #else
1202   tso = NULL; // avoid compiler warning only
1203   return rtsFalse;  /* dummy in non-PAR setup */
1204 #endif // SPARKS
1205 }
1206 #endif // PARALLEL_HASKELL
1207
1208 /* ----------------------------------------------------------------------------
1209  * Get work from a remote node (PARALLEL_HASKELL only)
1210  * ------------------------------------------------------------------------- */
1211     
1212 #if defined(PARALLEL_HASKELL)
1213 static rtsBool
1214 scheduleGetRemoteWork(rtsBool *receivedFinish)
1215 {
1216   ASSERT(emptyRunQueue());
1217
1218   if (RtsFlags.ParFlags.BufferTime) {
1219         IF_PAR_DEBUG(verbose, 
1220                 debugBelch("...send all pending data,"));
1221         {
1222           nat i;
1223           for (i=1; i<=nPEs; i++)
1224             sendImmediately(i); // send all messages away immediately
1225         }
1226   }
1227 # ifndef SPARKS
1228         //++EDEN++ idle() , i.e. send all buffers, wait for work
1229         // suppress fishing in EDEN... just look for incoming messages
1230         // (blocking receive)
1231   IF_PAR_DEBUG(verbose, 
1232                debugBelch("...wait for incoming messages...\n"));
1233   *receivedFinish = processMessages(); // blocking receive...
1234
1235         // and reenter scheduling loop after having received something
1236         // (return rtsFalse below)
1237
1238 # else /* activate SPARKS machinery */
1239 /* We get here, if we have no work, tried to activate a local spark, but still
1240    have no work. We try to get a remote spark, by sending a FISH message.
1241    Thread migration should be added here, and triggered when a sequence of 
1242    fishes returns without work. */
1243         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1244
1245       /* =8-[  no local sparks => look for work on other PEs */
1246         /*
1247          * We really have absolutely no work.  Send out a fish
1248          * (there may be some out there already), and wait for
1249          * something to arrive.  We clearly can't run any threads
1250          * until a SCHEDULE or RESUME arrives, and so that's what
1251          * we're hoping to see.  (Of course, we still have to
1252          * respond to other types of messages.)
1253          */
1254         rtsTime now = msTime() /*CURRENT_TIME*/;
1255         IF_PAR_DEBUG(verbose, 
1256                      debugBelch("--  now=%ld\n", now));
1257         IF_PAR_DEBUG(fish, // verbose,
1258              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1259                  (last_fish_arrived_at!=0 &&
1260                   last_fish_arrived_at+delay > now)) {
1261                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1262                      now, last_fish_arrived_at+delay, 
1263                      last_fish_arrived_at,
1264                      delay);
1265              });
1266   
1267         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1268             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1269           if (last_fish_arrived_at==0 ||
1270               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1271             /* outstandingFishes is set in sendFish, processFish;
1272                avoid flooding system with fishes via delay */
1273     next_fish_to_send_at = 0;  
1274   } else {
1275     /* ToDo: this should be done in the main scheduling loop to avoid the
1276              busy wait here; not so bad if fish delay is very small  */
1277     int iq = 0; // DEBUGGING -- HWL
1278     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1279     /* send a fish when ready, but process messages that arrive in the meantime */
1280     do {
1281       if (PacketsWaiting()) {
1282         iq++; // DEBUGGING
1283         *receivedFinish = processMessages();
1284       }
1285       now = msTime();
1286     } while (!*receivedFinish || now<next_fish_to_send_at);
1287     // JB: This means the fish could become obsolete, if we receive
1288     // work. Better check for work again? 
1289     // last line: while (!receivedFinish || !haveWork || now<...)
1290     // next line: if (receivedFinish || haveWork )
1291
1292     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1293       return rtsFalse;  // NB: this will leave scheduler loop
1294                         // immediately after return!
1295                           
1296     IF_PAR_DEBUG(fish, // verbose,
1297                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1298
1299   }
1300
1301     // JB: IMHO, this should all be hidden inside sendFish(...)
1302     /* pe = choosePE(); 
1303        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1304                 NEW_FISH_HUNGER);
1305
1306     // Global statistics: count no. of fishes
1307     if (RtsFlags.ParFlags.ParStats.Global &&
1308          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1309            globalParStats.tot_fish_mess++;
1310            }
1311     */ 
1312
1313   /* delayed fishes must have been sent by now! */
1314   next_fish_to_send_at = 0;  
1315   }
1316       
1317   *receivedFinish = processMessages();
1318 # endif /* SPARKS */
1319
1320  return rtsFalse;
1321  /* NB: this function always returns rtsFalse, meaning the scheduler
1322     loop continues with the next iteration; 
1323     rationale: 
1324       return code means success in finding work; we enter this function
1325       if there is no local work, thus have to send a fish which takes
1326       time until it arrives with work; in the meantime we should process
1327       messages in the main loop;
1328  */
1329 }
1330 #endif // PARALLEL_HASKELL
1331
1332 /* ----------------------------------------------------------------------------
1333  * PAR/GRAN: Report stats & debugging info(?)
1334  * ------------------------------------------------------------------------- */
1335
1336 #if defined(PAR) || defined(GRAN)
1337 static void
1338 scheduleGranParReport(void)
1339 {
1340   ASSERT(run_queue_hd != END_TSO_QUEUE);
1341
1342   /* Take a thread from the run queue, if we have work */
1343   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1344
1345     /* If this TSO has got its outport closed in the meantime, 
1346      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1347      * It has to be marked as TH_DEAD for this purpose.
1348      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1349
1350 JB: TODO: investigate wether state change field could be nuked
1351      entirely and replaced by the normal tso state (whatnext
1352      field). All we want to do is to kill tsos from outside.
1353      */
1354
1355     /* ToDo: write something to the log-file
1356     if (RTSflags.ParFlags.granSimStats && !sameThread)
1357         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1358
1359     CurrentTSO = t;
1360     */
1361     /* the spark pool for the current PE */
1362     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1363
1364     IF_DEBUG(scheduler, 
1365              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1366                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1367
1368     IF_PAR_DEBUG(fish,
1369              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1370                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1371
1372     if (RtsFlags.ParFlags.ParStats.Full && 
1373         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1374         (emitSchedule || // forced emit
1375          (t && LastTSO && t->id != LastTSO->id))) {
1376       /* 
1377          we are running a different TSO, so write a schedule event to log file
1378          NB: If we use fair scheduling we also have to write  a deschedule 
1379              event for LastTSO; with unfair scheduling we know that the
1380              previous tso has blocked whenever we switch to another tso, so
1381              we don't need it in GUM for now
1382       */
1383       IF_PAR_DEBUG(fish, // schedule,
1384                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1385
1386       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1387                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1388       emitSchedule = rtsFalse;
1389     }
1390 }     
1391 #endif
1392
1393 /* ----------------------------------------------------------------------------
1394  * After running a thread...
1395  * ------------------------------------------------------------------------- */
1396
1397 static void
1398 schedulePostRunThread(void)
1399 {
1400 #if defined(PAR)
1401     /* HACK 675: if the last thread didn't yield, make sure to print a 
1402        SCHEDULE event to the log file when StgRunning the next thread, even
1403        if it is the same one as before */
1404     LastTSO = t; 
1405     TimeOfLastYield = CURRENT_TIME;
1406 #endif
1407
1408   /* some statistics gathering in the parallel case */
1409
1410 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1411   switch (ret) {
1412     case HeapOverflow:
1413 # if defined(GRAN)
1414       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1415       globalGranStats.tot_heapover++;
1416 # elif defined(PAR)
1417       globalParStats.tot_heapover++;
1418 # endif
1419       break;
1420
1421      case StackOverflow:
1422 # if defined(GRAN)
1423       IF_DEBUG(gran, 
1424                DumpGranEvent(GR_DESCHEDULE, t));
1425       globalGranStats.tot_stackover++;
1426 # elif defined(PAR)
1427       // IF_DEBUG(par, 
1428       // DumpGranEvent(GR_DESCHEDULE, t);
1429       globalParStats.tot_stackover++;
1430 # endif
1431       break;
1432
1433     case ThreadYielding:
1434 # if defined(GRAN)
1435       IF_DEBUG(gran, 
1436                DumpGranEvent(GR_DESCHEDULE, t));
1437       globalGranStats.tot_yields++;
1438 # elif defined(PAR)
1439       // IF_DEBUG(par, 
1440       // DumpGranEvent(GR_DESCHEDULE, t);
1441       globalParStats.tot_yields++;
1442 # endif
1443       break; 
1444
1445     case ThreadBlocked:
1446 # if defined(GRAN)
1447       IF_DEBUG(scheduler,
1448                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1449                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1450                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1451                if (t->block_info.closure!=(StgClosure*)NULL)
1452                  print_bq(t->block_info.closure);
1453                debugBelch("\n"));
1454
1455       // ??? needed; should emit block before
1456       IF_DEBUG(gran, 
1457                DumpGranEvent(GR_DESCHEDULE, t)); 
1458       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1459       /*
1460         ngoq Dogh!
1461       ASSERT(procStatus[CurrentProc]==Busy || 
1462               ((procStatus[CurrentProc]==Fetching) && 
1463               (t->block_info.closure!=(StgClosure*)NULL)));
1464       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1465           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1466             procStatus[CurrentProc]==Fetching)) 
1467         procStatus[CurrentProc] = Idle;
1468       */
1469 # elif defined(PAR)
1470 //++PAR++  blockThread() writes the event (change?)
1471 # endif
1472     break;
1473
1474   case ThreadFinished:
1475     break;
1476
1477   default:
1478     barf("parGlobalStats: unknown return code");
1479     break;
1480     }
1481 #endif
1482 }
1483
1484 /* -----------------------------------------------------------------------------
1485  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1486  * -------------------------------------------------------------------------- */
1487
1488 static rtsBool
1489 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1490 {
1491     // did the task ask for a large block?
1492     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1493         // if so, get one and push it on the front of the nursery.
1494         bdescr *bd;
1495         lnat blocks;
1496         
1497         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1498         
1499         IF_DEBUG(scheduler,
1500                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1501                             (long)t->id, whatNext_strs[t->what_next], blocks));
1502         
1503         // don't do this if the nursery is (nearly) full, we'll GC first.
1504         if (cap->r.rCurrentNursery->link != NULL ||
1505             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1506                                                // if the nursery has only one block.
1507             
1508             ACQUIRE_SM_LOCK
1509             bd = allocGroup( blocks );
1510             RELEASE_SM_LOCK
1511             cap->r.rNursery->n_blocks += blocks;
1512             
1513             // link the new group into the list
1514             bd->link = cap->r.rCurrentNursery;
1515             bd->u.back = cap->r.rCurrentNursery->u.back;
1516             if (cap->r.rCurrentNursery->u.back != NULL) {
1517                 cap->r.rCurrentNursery->u.back->link = bd;
1518             } else {
1519 #if !defined(THREADED_RTS)
1520                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1521                        g0s0 == cap->r.rNursery);
1522 #endif
1523                 cap->r.rNursery->blocks = bd;
1524             }             
1525             cap->r.rCurrentNursery->u.back = bd;
1526             
1527             // initialise it as a nursery block.  We initialise the
1528             // step, gen_no, and flags field of *every* sub-block in
1529             // this large block, because this is easier than making
1530             // sure that we always find the block head of a large
1531             // block whenever we call Bdescr() (eg. evacuate() and
1532             // isAlive() in the GC would both have to do this, at
1533             // least).
1534             { 
1535                 bdescr *x;
1536                 for (x = bd; x < bd + blocks; x++) {
1537                     x->step = cap->r.rNursery;
1538                     x->gen_no = 0;
1539                     x->flags = 0;
1540                 }
1541             }
1542             
1543             // This assert can be a killer if the app is doing lots
1544             // of large block allocations.
1545             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1546             
1547             // now update the nursery to point to the new block
1548             cap->r.rCurrentNursery = bd;
1549             
1550             // we might be unlucky and have another thread get on the
1551             // run queue before us and steal the large block, but in that
1552             // case the thread will just end up requesting another large
1553             // block.
1554             pushOnRunQueue(cap,t);
1555             return rtsFalse;  /* not actually GC'ing */
1556         }
1557     }
1558     
1559     IF_DEBUG(scheduler,
1560              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1561                         (long)t->id, whatNext_strs[t->what_next]));
1562 #if defined(GRAN)
1563     ASSERT(!is_on_queue(t,CurrentProc));
1564 #elif defined(PARALLEL_HASKELL)
1565     /* Currently we emit a DESCHEDULE event before GC in GUM.
1566        ToDo: either add separate event to distinguish SYSTEM time from rest
1567        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1568     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1569         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1570                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1571         emitSchedule = rtsTrue;
1572     }
1573 #endif
1574       
1575     pushOnRunQueue(cap,t);
1576     return rtsTrue;
1577     /* actual GC is done at the end of the while loop in schedule() */
1578 }
1579
1580 /* -----------------------------------------------------------------------------
1581  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1582  * -------------------------------------------------------------------------- */
1583
1584 static void
1585 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1586 {
1587     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1588                                   (long)t->id, whatNext_strs[t->what_next]));
1589     /* just adjust the stack for this thread, then pop it back
1590      * on the run queue.
1591      */
1592     { 
1593         /* enlarge the stack */
1594         StgTSO *new_t = threadStackOverflow(cap, t);
1595         
1596         /* The TSO attached to this Task may have moved, so update the
1597          * pointer to it.
1598          */
1599         if (task->tso == t) {
1600             task->tso = new_t;
1601         }
1602         pushOnRunQueue(cap,new_t);
1603     }
1604 }
1605
1606 /* -----------------------------------------------------------------------------
1607  * Handle a thread that returned to the scheduler with ThreadYielding
1608  * -------------------------------------------------------------------------- */
1609
1610 static rtsBool
1611 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1612 {
1613     // Reset the context switch flag.  We don't do this just before
1614     // running the thread, because that would mean we would lose ticks
1615     // during GC, which can lead to unfair scheduling (a thread hogs
1616     // the CPU because the tick always arrives during GC).  This way
1617     // penalises threads that do a lot of allocation, but that seems
1618     // better than the alternative.
1619     context_switch = 0;
1620     
1621     /* put the thread back on the run queue.  Then, if we're ready to
1622      * GC, check whether this is the last task to stop.  If so, wake
1623      * up the GC thread.  getThread will block during a GC until the
1624      * GC is finished.
1625      */
1626     IF_DEBUG(scheduler,
1627              if (t->what_next != prev_what_next) {
1628                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1629                             (long)t->id, whatNext_strs[t->what_next]);
1630              } else {
1631                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1632                             (long)t->id, whatNext_strs[t->what_next]);
1633              }
1634         );
1635     
1636     IF_DEBUG(sanity,
1637              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1638              checkTSO(t));
1639     ASSERT(t->link == END_TSO_QUEUE);
1640     
1641     // Shortcut if we're just switching evaluators: don't bother
1642     // doing stack squeezing (which can be expensive), just run the
1643     // thread.
1644     if (t->what_next != prev_what_next) {
1645         return rtsTrue;
1646     }
1647     
1648 #if defined(GRAN)
1649     ASSERT(!is_on_queue(t,CurrentProc));
1650       
1651     IF_DEBUG(sanity,
1652              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1653              checkThreadQsSanity(rtsTrue));
1654
1655 #endif
1656
1657     addToRunQueue(cap,t);
1658
1659 #if defined(GRAN)
1660     /* add a ContinueThread event to actually process the thread */
1661     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1662               ContinueThread,
1663               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1664     IF_GRAN_DEBUG(bq, 
1665                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1666                   G_EVENTQ(0);
1667                   G_CURR_THREADQ(0));
1668 #endif
1669     return rtsFalse;
1670 }
1671
1672 /* -----------------------------------------------------------------------------
1673  * Handle a thread that returned to the scheduler with ThreadBlocked
1674  * -------------------------------------------------------------------------- */
1675
1676 static void
1677 scheduleHandleThreadBlocked( StgTSO *t
1678 #if !defined(GRAN) && !defined(DEBUG)
1679     STG_UNUSED
1680 #endif
1681     )
1682 {
1683 #if defined(GRAN)
1684     IF_DEBUG(scheduler,
1685              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1686                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1687              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1688     
1689     // ??? needed; should emit block before
1690     IF_DEBUG(gran, 
1691              DumpGranEvent(GR_DESCHEDULE, t)); 
1692     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1693     /*
1694       ngoq Dogh!
1695       ASSERT(procStatus[CurrentProc]==Busy || 
1696       ((procStatus[CurrentProc]==Fetching) && 
1697       (t->block_info.closure!=(StgClosure*)NULL)));
1698       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1699       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1700       procStatus[CurrentProc]==Fetching)) 
1701       procStatus[CurrentProc] = Idle;
1702     */
1703 #elif defined(PAR)
1704     IF_DEBUG(scheduler,
1705              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1706                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1707     IF_PAR_DEBUG(bq,
1708                  
1709                  if (t->block_info.closure!=(StgClosure*)NULL) 
1710                  print_bq(t->block_info.closure));
1711     
1712     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1713     blockThread(t);
1714     
1715     /* whatever we schedule next, we must log that schedule */
1716     emitSchedule = rtsTrue;
1717     
1718 #else /* !GRAN */
1719
1720       // We don't need to do anything.  The thread is blocked, and it
1721       // has tidied up its stack and placed itself on whatever queue
1722       // it needs to be on.
1723
1724 #if !defined(THREADED_RTS)
1725     ASSERT(t->why_blocked != NotBlocked);
1726              // This might not be true under THREADED_RTS: we don't have
1727              // exclusive access to this TSO, so someone might have
1728              // woken it up by now.  This actually happens: try
1729              // conc023 +RTS -N2.
1730 #endif
1731
1732     IF_DEBUG(scheduler,
1733              debugBelch("--<< thread %d (%s) stopped: ", 
1734                         t->id, whatNext_strs[t->what_next]);
1735              printThreadBlockage(t);
1736              debugBelch("\n"));
1737     
1738     /* Only for dumping event to log file 
1739        ToDo: do I need this in GranSim, too?
1740        blockThread(t);
1741     */
1742 #endif
1743 }
1744
1745 /* -----------------------------------------------------------------------------
1746  * Handle a thread that returned to the scheduler with ThreadFinished
1747  * -------------------------------------------------------------------------- */
1748
1749 static rtsBool
1750 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1751 {
1752     /* Need to check whether this was a main thread, and if so,
1753      * return with the return value.
1754      *
1755      * We also end up here if the thread kills itself with an
1756      * uncaught exception, see Exception.cmm.
1757      */
1758     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1759                                   t->id, whatNext_strs[t->what_next]));
1760
1761 #if defined(GRAN)
1762       endThread(t, CurrentProc); // clean-up the thread
1763 #elif defined(PARALLEL_HASKELL)
1764       /* For now all are advisory -- HWL */
1765       //if(t->priority==AdvisoryPriority) ??
1766       advisory_thread_count--; // JB: Caution with this counter, buggy!
1767       
1768 # if defined(DIST)
1769       if(t->dist.priority==RevalPriority)
1770         FinishReval(t);
1771 # endif
1772     
1773 # if defined(EDENOLD)
1774       // the thread could still have an outport... (BUG)
1775       if (t->eden.outport != -1) {
1776       // delete the outport for the tso which has finished...
1777         IF_PAR_DEBUG(eden_ports,
1778                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1779                               t->eden.outport, t->id));
1780         deleteOPT(t);
1781       }
1782       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1783       if (t->eden.epid != -1) {
1784         IF_PAR_DEBUG(eden_ports,
1785                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1786                            t->id, t->eden.epid));
1787         removeTSOfromProcess(t);
1788       }
1789 # endif 
1790
1791 # if defined(PAR)
1792       if (RtsFlags.ParFlags.ParStats.Full &&
1793           !RtsFlags.ParFlags.ParStats.Suppressed) 
1794         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1795
1796       //  t->par only contains statistics: left out for now...
1797       IF_PAR_DEBUG(fish,
1798                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1799                               t->id,t,t->par.sparkname));
1800 # endif
1801 #endif // PARALLEL_HASKELL
1802
1803       //
1804       // Check whether the thread that just completed was a bound
1805       // thread, and if so return with the result.  
1806       //
1807       // There is an assumption here that all thread completion goes
1808       // through this point; we need to make sure that if a thread
1809       // ends up in the ThreadKilled state, that it stays on the run
1810       // queue so it can be dealt with here.
1811       //
1812
1813       if (t->bound) {
1814
1815           if (t->bound != task) {
1816 #if !defined(THREADED_RTS)
1817               // Must be a bound thread that is not the topmost one.  Leave
1818               // it on the run queue until the stack has unwound to the
1819               // point where we can deal with this.  Leaving it on the run
1820               // queue also ensures that the garbage collector knows about
1821               // this thread and its return value (it gets dropped from the
1822               // all_threads list so there's no other way to find it).
1823               appendToRunQueue(cap,t);
1824               return rtsFalse;
1825 #else
1826               // this cannot happen in the threaded RTS, because a
1827               // bound thread can only be run by the appropriate Task.
1828               barf("finished bound thread that isn't mine");
1829 #endif
1830           }
1831
1832           ASSERT(task->tso == t);
1833
1834           if (t->what_next == ThreadComplete) {
1835               if (task->ret) {
1836                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1837                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1838               }
1839               task->stat = Success;
1840           } else {
1841               if (task->ret) {
1842                   *(task->ret) = NULL;
1843               }
1844               if (interrupted) {
1845                   task->stat = Interrupted;
1846               } else {
1847                   task->stat = Killed;
1848               }
1849           }
1850 #ifdef DEBUG
1851           removeThreadLabel((StgWord)task->tso->id);
1852 #endif
1853           return rtsTrue; // tells schedule() to return
1854       }
1855
1856       return rtsFalse;
1857 }
1858
1859 /* -----------------------------------------------------------------------------
1860  * Perform a heap census, if PROFILING
1861  * -------------------------------------------------------------------------- */
1862
1863 static rtsBool
1864 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 {
1866 #if defined(PROFILING)
1867     // When we have +RTS -i0 and we're heap profiling, do a census at
1868     // every GC.  This lets us get repeatable runs for debugging.
1869     if (performHeapProfile ||
1870         (RtsFlags.ProfFlags.profileInterval==0 &&
1871          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1872
1873         // checking black holes is necessary before GC, otherwise
1874         // there may be threads that are unreachable except by the
1875         // blackhole queue, which the GC will consider to be
1876         // deadlocked.
1877         scheduleCheckBlackHoles(&MainCapability);
1878
1879         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1880         GarbageCollect(GetRoots, rtsTrue);
1881
1882         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1883         heapCensus();
1884
1885         performHeapProfile = rtsFalse;
1886         return rtsTrue;  // true <=> we already GC'd
1887     }
1888 #endif
1889     return rtsFalse;
1890 }
1891
1892 /* -----------------------------------------------------------------------------
1893  * Perform a garbage collection if necessary
1894  * -------------------------------------------------------------------------- */
1895
1896 static void
1897 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1898               rtsBool force_major, void (*get_roots)(evac_fn))
1899 {
1900     StgTSO *t;
1901 #ifdef THREADED_RTS
1902     static volatile StgWord waiting_for_gc;
1903     rtsBool was_waiting;
1904     nat i;
1905 #endif
1906
1907 #ifdef THREADED_RTS
1908     // In order to GC, there must be no threads running Haskell code.
1909     // Therefore, the GC thread needs to hold *all* the capabilities,
1910     // and release them after the GC has completed.  
1911     //
1912     // This seems to be the simplest way: previous attempts involved
1913     // making all the threads with capabilities give up their
1914     // capabilities and sleep except for the *last* one, which
1915     // actually did the GC.  But it's quite hard to arrange for all
1916     // the other tasks to sleep and stay asleep.
1917     //
1918         
1919     was_waiting = cas(&waiting_for_gc, 0, 1);
1920     if (was_waiting) {
1921         do {
1922             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1923             if (cap) yieldCapability(&cap,task);
1924         } while (waiting_for_gc);
1925         return;  // NOTE: task->cap might have changed here
1926     }
1927
1928     for (i=0; i < n_capabilities; i++) {
1929         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1930         if (cap != &capabilities[i]) {
1931             Capability *pcap = &capabilities[i];
1932             // we better hope this task doesn't get migrated to
1933             // another Capability while we're waiting for this one.
1934             // It won't, because load balancing happens while we have
1935             // all the Capabilities, but even so it's a slightly
1936             // unsavoury invariant.
1937             task->cap = pcap;
1938             context_switch = 1;
1939             waitForReturnCapability(&pcap, task);
1940             if (pcap != &capabilities[i]) {
1941                 barf("scheduleDoGC: got the wrong capability");
1942             }
1943         }
1944     }
1945
1946     waiting_for_gc = rtsFalse;
1947 #endif
1948
1949     /* Kick any transactions which are invalid back to their
1950      * atomically frames.  When next scheduled they will try to
1951      * commit, this commit will fail and they will retry.
1952      */
1953     { 
1954         StgTSO *next;
1955
1956         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1957             if (t->what_next == ThreadRelocated) {
1958                 next = t->link;
1959             } else {
1960                 next = t->global_link;
1961                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1962                     if (!stmValidateNestOfTransactions (t -> trec)) {
1963                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1964                         
1965                         // strip the stack back to the
1966                         // ATOMICALLY_FRAME, aborting the (nested)
1967                         // transaction, and saving the stack of any
1968                         // partially-evaluated thunks on the heap.
1969                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
1970                         
1971 #ifdef REG_R1
1972                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1973 #endif
1974                     }
1975                 }
1976             }
1977         }
1978     }
1979     
1980     // so this happens periodically:
1981     if (cap) scheduleCheckBlackHoles(cap);
1982     
1983     IF_DEBUG(scheduler, printAllThreads());
1984
1985     /* everybody back, start the GC.
1986      * Could do it in this thread, or signal a condition var
1987      * to do it in another thread.  Either way, we need to
1988      * broadcast on gc_pending_cond afterward.
1989      */
1990 #if defined(THREADED_RTS)
1991     IF_DEBUG(scheduler,sched_belch("doing GC"));
1992 #endif
1993     GarbageCollect(get_roots, force_major);
1994     
1995 #if defined(THREADED_RTS)
1996     // release our stash of capabilities.
1997     for (i = 0; i < n_capabilities; i++) {
1998         if (cap != &capabilities[i]) {
1999             task->cap = &capabilities[i];
2000             releaseCapability(&capabilities[i]);
2001         }
2002     }
2003     if (cap) {
2004         task->cap = cap;
2005     } else {
2006         task->cap = NULL;
2007     }
2008 #endif
2009
2010 #if defined(GRAN)
2011     /* add a ContinueThread event to continue execution of current thread */
2012     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2013               ContinueThread,
2014               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2015     IF_GRAN_DEBUG(bq, 
2016                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2017                   G_EVENTQ(0);
2018                   G_CURR_THREADQ(0));
2019 #endif /* GRAN */
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2024  * used by Control.Concurrent for error checking.
2025  * ------------------------------------------------------------------------- */
2026  
2027 StgBool
2028 rtsSupportsBoundThreads(void)
2029 {
2030 #if defined(THREADED_RTS)
2031   return rtsTrue;
2032 #else
2033   return rtsFalse;
2034 #endif
2035 }
2036
2037 /* ---------------------------------------------------------------------------
2038  * isThreadBound(tso): check whether tso is bound to an OS thread.
2039  * ------------------------------------------------------------------------- */
2040  
2041 StgBool
2042 isThreadBound(StgTSO* tso USED_IF_THREADS)
2043 {
2044 #if defined(THREADED_RTS)
2045   return (tso->bound != NULL);
2046 #endif
2047   return rtsFalse;
2048 }
2049
2050 /* ---------------------------------------------------------------------------
2051  * Singleton fork(). Do not copy any running threads.
2052  * ------------------------------------------------------------------------- */
2053
2054 #if !defined(mingw32_HOST_OS)
2055 #define FORKPROCESS_PRIMOP_SUPPORTED
2056 #endif
2057
2058 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2059 static void 
2060 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2061 #endif
2062 StgInt
2063 forkProcess(HsStablePtr *entry
2064 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2065             STG_UNUSED
2066 #endif
2067            )
2068 {
2069 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2070     Task *task;
2071     pid_t pid;
2072     StgTSO* t,*next;
2073     Capability *cap;
2074     
2075 #if defined(THREADED_RTS)
2076     if (RtsFlags.ParFlags.nNodes > 1) {
2077         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2078         stg_exit(EXIT_FAILURE);
2079     }
2080 #endif
2081
2082     IF_DEBUG(scheduler,sched_belch("forking!"));
2083     
2084     // ToDo: for SMP, we should probably acquire *all* the capabilities
2085     cap = rts_lock();
2086     
2087     pid = fork();
2088     
2089     if (pid) { // parent
2090         
2091         // just return the pid
2092         rts_unlock(cap);
2093         return pid;
2094         
2095     } else { // child
2096         
2097         // delete all threads
2098         cap->run_queue_hd = END_TSO_QUEUE;
2099         cap->run_queue_tl = END_TSO_QUEUE;
2100         
2101         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2102             next = t->link;
2103             
2104             // don't allow threads to catch the ThreadKilled exception
2105             deleteThreadImmediately(cap,t);
2106         }
2107         
2108         // wipe the task list
2109         ACQUIRE_LOCK(&sched_mutex);
2110         for (task = all_tasks; task != NULL; task=task->all_link) {
2111             if (task != cap->running_task) discardTask(task);
2112         }
2113         RELEASE_LOCK(&sched_mutex);
2114
2115         cap->suspended_ccalling_tasks = NULL;
2116
2117 #if defined(THREADED_RTS)
2118         // wipe our spare workers list.
2119         cap->spare_workers = NULL;
2120         cap->returning_tasks_hd = NULL;
2121         cap->returning_tasks_tl = NULL;
2122 #endif
2123
2124         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2125         rts_checkSchedStatus("forkProcess",cap);
2126         
2127         rts_unlock(cap);
2128         hs_exit();                      // clean up and exit
2129         stg_exit(EXIT_SUCCESS);
2130     }
2131 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2132     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2133     return -1;
2134 #endif
2135 }
2136
2137 /* ---------------------------------------------------------------------------
2138  * Delete the threads on the run queue of the current capability.
2139  * ------------------------------------------------------------------------- */
2140    
2141 static void
2142 deleteRunQueue (Capability *cap)
2143 {
2144     StgTSO *t, *next;
2145     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2146         ASSERT(t->what_next != ThreadRelocated);
2147         next = t->link;
2148         deleteThread(cap, t);
2149     }
2150 }
2151
2152 /* startThread and  insertThread are now in GranSim.c -- HWL */
2153
2154
2155 /* -----------------------------------------------------------------------------
2156    Managing the suspended_ccalling_tasks list.
2157    Locks required: sched_mutex
2158    -------------------------------------------------------------------------- */
2159
2160 STATIC_INLINE void
2161 suspendTask (Capability *cap, Task *task)
2162 {
2163     ASSERT(task->next == NULL && task->prev == NULL);
2164     task->next = cap->suspended_ccalling_tasks;
2165     task->prev = NULL;
2166     if (cap->suspended_ccalling_tasks) {
2167         cap->suspended_ccalling_tasks->prev = task;
2168     }
2169     cap->suspended_ccalling_tasks = task;
2170 }
2171
2172 STATIC_INLINE void
2173 recoverSuspendedTask (Capability *cap, Task *task)
2174 {
2175     if (task->prev) {
2176         task->prev->next = task->next;
2177     } else {
2178         ASSERT(cap->suspended_ccalling_tasks == task);
2179         cap->suspended_ccalling_tasks = task->next;
2180     }
2181     if (task->next) {
2182         task->next->prev = task->prev;
2183     }
2184     task->next = task->prev = NULL;
2185 }
2186
2187 /* ---------------------------------------------------------------------------
2188  * Suspending & resuming Haskell threads.
2189  * 
2190  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2191  * its capability before calling the C function.  This allows another
2192  * task to pick up the capability and carry on running Haskell
2193  * threads.  It also means that if the C call blocks, it won't lock
2194  * the whole system.
2195  *
2196  * The Haskell thread making the C call is put to sleep for the
2197  * duration of the call, on the susepended_ccalling_threads queue.  We
2198  * give out a token to the task, which it can use to resume the thread
2199  * on return from the C function.
2200  * ------------------------------------------------------------------------- */
2201    
2202 void *
2203 suspendThread (StgRegTable *reg)
2204 {
2205   Capability *cap;
2206   int saved_errno = errno;
2207   StgTSO *tso;
2208   Task *task;
2209
2210   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2211    */
2212   cap = regTableToCapability(reg);
2213
2214   task = cap->running_task;
2215   tso = cap->r.rCurrentTSO;
2216
2217   IF_DEBUG(scheduler,
2218            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2219
2220   // XXX this might not be necessary --SDM
2221   tso->what_next = ThreadRunGHC;
2222
2223   threadPaused(cap,tso);
2224
2225   if(tso->blocked_exceptions == NULL)  {
2226       tso->why_blocked = BlockedOnCCall;
2227       tso->blocked_exceptions = END_TSO_QUEUE;
2228   } else {
2229       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2230   }
2231
2232   // Hand back capability
2233   task->suspended_tso = tso;
2234
2235   ACQUIRE_LOCK(&cap->lock);
2236
2237   suspendTask(cap,task);
2238   cap->in_haskell = rtsFalse;
2239   releaseCapability_(cap);
2240   
2241   RELEASE_LOCK(&cap->lock);
2242
2243 #if defined(THREADED_RTS)
2244   /* Preparing to leave the RTS, so ensure there's a native thread/task
2245      waiting to take over.
2246   */
2247   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2248 #endif
2249
2250   errno = saved_errno;
2251   return task;
2252 }
2253
2254 StgRegTable *
2255 resumeThread (void *task_)
2256 {
2257     StgTSO *tso;
2258     Capability *cap;
2259     int saved_errno = errno;
2260     Task *task = task_;
2261
2262     cap = task->cap;
2263     // Wait for permission to re-enter the RTS with the result.
2264     waitForReturnCapability(&cap,task);
2265     // we might be on a different capability now... but if so, our
2266     // entry on the suspended_ccalling_tasks list will also have been
2267     // migrated.
2268
2269     // Remove the thread from the suspended list
2270     recoverSuspendedTask(cap,task);
2271
2272     tso = task->suspended_tso;
2273     task->suspended_tso = NULL;
2274     tso->link = END_TSO_QUEUE;
2275     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2276     
2277     if (tso->why_blocked == BlockedOnCCall) {
2278         awakenBlockedQueue(cap,tso->blocked_exceptions);
2279         tso->blocked_exceptions = NULL;
2280     }
2281     
2282     /* Reset blocking status */
2283     tso->why_blocked  = NotBlocked;
2284     
2285     cap->r.rCurrentTSO = tso;
2286     cap->in_haskell = rtsTrue;
2287     errno = saved_errno;
2288
2289     /* We might have GC'd, mark the TSO dirty again */
2290     dirtyTSO(tso);
2291
2292     IF_DEBUG(sanity, checkTSO(tso));
2293
2294     return &cap->r;
2295 }
2296
2297 /* ---------------------------------------------------------------------------
2298  * Comparing Thread ids.
2299  *
2300  * This is used from STG land in the implementation of the
2301  * instances of Eq/Ord for ThreadIds.
2302  * ------------------------------------------------------------------------ */
2303
2304 int
2305 cmp_thread(StgPtr tso1, StgPtr tso2) 
2306
2307   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2308   StgThreadID id2 = ((StgTSO *)tso2)->id;
2309  
2310   if (id1 < id2) return (-1);
2311   if (id1 > id2) return 1;
2312   return 0;
2313 }
2314
2315 /* ---------------------------------------------------------------------------
2316  * Fetching the ThreadID from an StgTSO.
2317  *
2318  * This is used in the implementation of Show for ThreadIds.
2319  * ------------------------------------------------------------------------ */
2320 int
2321 rts_getThreadId(StgPtr tso) 
2322 {
2323   return ((StgTSO *)tso)->id;
2324 }
2325
2326 #ifdef DEBUG
2327 void
2328 labelThread(StgPtr tso, char *label)
2329 {
2330   int len;
2331   void *buf;
2332
2333   /* Caveat: Once set, you can only set the thread name to "" */
2334   len = strlen(label)+1;
2335   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2336   strncpy(buf,label,len);
2337   /* Update will free the old memory for us */
2338   updateThreadLabel(((StgTSO *)tso)->id,buf);
2339 }
2340 #endif /* DEBUG */
2341
2342 /* ---------------------------------------------------------------------------
2343    Create a new thread.
2344
2345    The new thread starts with the given stack size.  Before the
2346    scheduler can run, however, this thread needs to have a closure
2347    (and possibly some arguments) pushed on its stack.  See
2348    pushClosure() in Schedule.h.
2349
2350    createGenThread() and createIOThread() (in SchedAPI.h) are
2351    convenient packaged versions of this function.
2352
2353    currently pri (priority) is only used in a GRAN setup -- HWL
2354    ------------------------------------------------------------------------ */
2355 #if defined(GRAN)
2356 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2357 StgTSO *
2358 createThread(nat size, StgInt pri)
2359 #else
2360 StgTSO *
2361 createThread(Capability *cap, nat size)
2362 #endif
2363 {
2364     StgTSO *tso;
2365     nat stack_size;
2366
2367     /* sched_mutex is *not* required */
2368
2369     /* First check whether we should create a thread at all */
2370 #if defined(PARALLEL_HASKELL)
2371     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2372     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2373         threadsIgnored++;
2374         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2375                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2376         return END_TSO_QUEUE;
2377     }
2378     threadsCreated++;
2379 #endif
2380
2381 #if defined(GRAN)
2382     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2383 #endif
2384
2385     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2386
2387     /* catch ridiculously small stack sizes */
2388     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2389         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2390     }
2391
2392     stack_size = size - TSO_STRUCT_SIZEW;
2393     
2394     tso = (StgTSO *)allocateLocal(cap, size);
2395     TICK_ALLOC_TSO(stack_size, 0);
2396
2397     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2398 #if defined(GRAN)
2399     SET_GRAN_HDR(tso, ThisPE);
2400 #endif
2401
2402     // Always start with the compiled code evaluator
2403     tso->what_next = ThreadRunGHC;
2404
2405     tso->why_blocked  = NotBlocked;
2406     tso->blocked_exceptions = NULL;
2407     tso->flags = TSO_DIRTY;
2408     
2409     tso->saved_errno = 0;
2410     tso->bound = NULL;
2411     
2412     tso->stack_size     = stack_size;
2413     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2414                           - TSO_STRUCT_SIZEW;
2415     tso->sp             = (P_)&(tso->stack) + stack_size;
2416
2417     tso->trec = NO_TREC;
2418     
2419 #ifdef PROFILING
2420     tso->prof.CCCS = CCS_MAIN;
2421 #endif
2422     
2423   /* put a stop frame on the stack */
2424     tso->sp -= sizeofW(StgStopFrame);
2425     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2426     tso->link = END_TSO_QUEUE;
2427     
2428   // ToDo: check this
2429 #if defined(GRAN)
2430     /* uses more flexible routine in GranSim */
2431     insertThread(tso, CurrentProc);
2432 #else
2433     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2434      * from its creation
2435      */
2436 #endif
2437     
2438 #if defined(GRAN) 
2439     if (RtsFlags.GranFlags.GranSimStats.Full) 
2440         DumpGranEvent(GR_START,tso);
2441 #elif defined(PARALLEL_HASKELL)
2442     if (RtsFlags.ParFlags.ParStats.Full) 
2443         DumpGranEvent(GR_STARTQ,tso);
2444     /* HACk to avoid SCHEDULE 
2445        LastTSO = tso; */
2446 #endif
2447     
2448     /* Link the new thread on the global thread list.
2449      */
2450     ACQUIRE_LOCK(&sched_mutex);
2451     tso->id = next_thread_id++;  // while we have the mutex
2452     tso->global_link = all_threads;
2453     all_threads = tso;
2454     RELEASE_LOCK(&sched_mutex);
2455     
2456 #if defined(DIST)
2457     tso->dist.priority = MandatoryPriority; //by default that is...
2458 #endif
2459     
2460 #if defined(GRAN)
2461     tso->gran.pri = pri;
2462 # if defined(DEBUG)
2463     tso->gran.magic = TSO_MAGIC; // debugging only
2464 # endif
2465     tso->gran.sparkname   = 0;
2466     tso->gran.startedat   = CURRENT_TIME; 
2467     tso->gran.exported    = 0;
2468     tso->gran.basicblocks = 0;
2469     tso->gran.allocs      = 0;
2470     tso->gran.exectime    = 0;
2471     tso->gran.fetchtime   = 0;
2472     tso->gran.fetchcount  = 0;
2473     tso->gran.blocktime   = 0;
2474     tso->gran.blockcount  = 0;
2475     tso->gran.blockedat   = 0;
2476     tso->gran.globalsparks = 0;
2477     tso->gran.localsparks  = 0;
2478     if (RtsFlags.GranFlags.Light)
2479         tso->gran.clock  = Now; /* local clock */
2480     else
2481         tso->gran.clock  = 0;
2482     
2483     IF_DEBUG(gran,printTSO(tso));
2484 #elif defined(PARALLEL_HASKELL)
2485 # if defined(DEBUG)
2486     tso->par.magic = TSO_MAGIC; // debugging only
2487 # endif
2488     tso->par.sparkname   = 0;
2489     tso->par.startedat   = CURRENT_TIME; 
2490     tso->par.exported    = 0;
2491     tso->par.basicblocks = 0;
2492     tso->par.allocs      = 0;
2493     tso->par.exectime    = 0;
2494     tso->par.fetchtime   = 0;
2495     tso->par.fetchcount  = 0;
2496     tso->par.blocktime   = 0;
2497     tso->par.blockcount  = 0;
2498     tso->par.blockedat   = 0;
2499     tso->par.globalsparks = 0;
2500     tso->par.localsparks  = 0;
2501 #endif
2502     
2503 #if defined(GRAN)
2504     globalGranStats.tot_threads_created++;
2505     globalGranStats.threads_created_on_PE[CurrentProc]++;
2506     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2507     globalGranStats.tot_sq_probes++;
2508 #elif defined(PARALLEL_HASKELL)
2509     // collect parallel global statistics (currently done together with GC stats)
2510     if (RtsFlags.ParFlags.ParStats.Global &&
2511         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2512         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2513         globalParStats.tot_threads_created++;
2514     }
2515 #endif 
2516     
2517 #if defined(GRAN)
2518     IF_GRAN_DEBUG(pri,
2519                   sched_belch("==__ schedule: Created TSO %d (%p);",
2520                               CurrentProc, tso, tso->id));
2521 #elif defined(PARALLEL_HASKELL)
2522     IF_PAR_DEBUG(verbose,
2523                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2524                              (long)tso->id, tso, advisory_thread_count));
2525 #else
2526     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2527                                    (long)tso->id, (long)tso->stack_size));
2528 #endif    
2529     return tso;
2530 }
2531
2532 #if defined(PAR)
2533 /* RFP:
2534    all parallel thread creation calls should fall through the following routine.
2535 */
2536 StgTSO *
2537 createThreadFromSpark(rtsSpark spark) 
2538 { StgTSO *tso;
2539   ASSERT(spark != (rtsSpark)NULL);
2540 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2541   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2542   { threadsIgnored++;
2543     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2544           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2545     return END_TSO_QUEUE;
2546   }
2547   else
2548   { threadsCreated++;
2549     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2550     if (tso==END_TSO_QUEUE)     
2551       barf("createSparkThread: Cannot create TSO");
2552 #if defined(DIST)
2553     tso->priority = AdvisoryPriority;
2554 #endif
2555     pushClosure(tso,spark);
2556     addToRunQueue(tso);
2557     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2558   }
2559   return tso;
2560 }
2561 #endif
2562
2563 /*
2564   Turn a spark into a thread.
2565   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2566 */
2567 #if 0
2568 StgTSO *
2569 activateSpark (rtsSpark spark) 
2570 {
2571   StgTSO *tso;
2572
2573   tso = createSparkThread(spark);
2574   if (RtsFlags.ParFlags.ParStats.Full) {   
2575     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2576       IF_PAR_DEBUG(verbose,
2577                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2578                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2579   }
2580   // ToDo: fwd info on local/global spark to thread -- HWL
2581   // tso->gran.exported =  spark->exported;
2582   // tso->gran.locked =   !spark->global;
2583   // tso->gran.sparkname = spark->name;
2584
2585   return tso;
2586 }
2587 #endif
2588
2589 /* ---------------------------------------------------------------------------
2590  * scheduleThread()
2591  *
2592  * scheduleThread puts a thread on the end  of the runnable queue.
2593  * This will usually be done immediately after a thread is created.
2594  * The caller of scheduleThread must create the thread using e.g.
2595  * createThread and push an appropriate closure
2596  * on this thread's stack before the scheduler is invoked.
2597  * ------------------------------------------------------------------------ */
2598
2599 void
2600 scheduleThread(Capability *cap, StgTSO *tso)
2601 {
2602     // The thread goes at the *end* of the run-queue, to avoid possible
2603     // starvation of any threads already on the queue.
2604     appendToRunQueue(cap,tso);
2605 }
2606
2607 Capability *
2608 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2609 {
2610     Task *task;
2611
2612     // We already created/initialised the Task
2613     task = cap->running_task;
2614
2615     // This TSO is now a bound thread; make the Task and TSO
2616     // point to each other.
2617     tso->bound = task;
2618
2619     task->tso = tso;
2620     task->ret = ret;
2621     task->stat = NoStatus;
2622
2623     appendToRunQueue(cap,tso);
2624
2625     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2626
2627 #if defined(GRAN)
2628     /* GranSim specific init */
2629     CurrentTSO = m->tso;                // the TSO to run
2630     procStatus[MainProc] = Busy;        // status of main PE
2631     CurrentProc = MainProc;             // PE to run it on
2632 #endif
2633
2634     cap = schedule(cap,task);
2635
2636     ASSERT(task->stat != NoStatus);
2637     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2638
2639     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2640     return cap;
2641 }
2642
2643 /* ----------------------------------------------------------------------------
2644  * Starting Tasks
2645  * ------------------------------------------------------------------------- */
2646
2647 #if defined(THREADED_RTS)
2648 void
2649 workerStart(Task *task)
2650 {
2651     Capability *cap;
2652
2653     // See startWorkerTask().
2654     ACQUIRE_LOCK(&task->lock);
2655     cap = task->cap;
2656     RELEASE_LOCK(&task->lock);
2657
2658     // set the thread-local pointer to the Task:
2659     taskEnter(task);
2660
2661     // schedule() runs without a lock.
2662     cap = schedule(cap,task);
2663
2664     // On exit from schedule(), we have a Capability.
2665     releaseCapability(cap);
2666     taskStop(task);
2667 }
2668 #endif
2669
2670 /* ---------------------------------------------------------------------------
2671  * initScheduler()
2672  *
2673  * Initialise the scheduler.  This resets all the queues - if the
2674  * queues contained any threads, they'll be garbage collected at the
2675  * next pass.
2676  *
2677  * ------------------------------------------------------------------------ */
2678
2679 void 
2680 initScheduler(void)
2681 {
2682 #if defined(GRAN)
2683   nat i;
2684   for (i=0; i<=MAX_PROC; i++) {
2685     run_queue_hds[i]      = END_TSO_QUEUE;
2686     run_queue_tls[i]      = END_TSO_QUEUE;
2687     blocked_queue_hds[i]  = END_TSO_QUEUE;
2688     blocked_queue_tls[i]  = END_TSO_QUEUE;
2689     ccalling_threadss[i]  = END_TSO_QUEUE;
2690     blackhole_queue[i]    = END_TSO_QUEUE;
2691     sleeping_queue        = END_TSO_QUEUE;
2692   }
2693 #elif !defined(THREADED_RTS)
2694   blocked_queue_hd  = END_TSO_QUEUE;
2695   blocked_queue_tl  = END_TSO_QUEUE;
2696   sleeping_queue    = END_TSO_QUEUE;
2697 #endif
2698
2699   blackhole_queue   = END_TSO_QUEUE;
2700   all_threads       = END_TSO_QUEUE;
2701
2702   context_switch = 0;
2703   interrupted    = 0;
2704
2705   RtsFlags.ConcFlags.ctxtSwitchTicks =
2706       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2707       
2708 #if defined(THREADED_RTS)
2709   /* Initialise the mutex and condition variables used by
2710    * the scheduler. */
2711   initMutex(&sched_mutex);
2712 #endif
2713   
2714   ACQUIRE_LOCK(&sched_mutex);
2715
2716   /* A capability holds the state a native thread needs in
2717    * order to execute STG code. At least one capability is
2718    * floating around (only THREADED_RTS builds have more than one).
2719    */
2720   initCapabilities();
2721
2722   initTaskManager();
2723
2724 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2725   initSparkPools();
2726 #endif
2727
2728 #if defined(THREADED_RTS)
2729   /*
2730    * Eagerly start one worker to run each Capability, except for
2731    * Capability 0.  The idea is that we're probably going to start a
2732    * bound thread on Capability 0 pretty soon, so we don't want a
2733    * worker task hogging it.
2734    */
2735   { 
2736       nat i;
2737       Capability *cap;
2738       for (i = 1; i < n_capabilities; i++) {
2739           cap = &capabilities[i];
2740           ACQUIRE_LOCK(&cap->lock);
2741           startWorkerTask(cap, workerStart);
2742           RELEASE_LOCK(&cap->lock);
2743       }
2744   }
2745 #endif
2746
2747   RELEASE_LOCK(&sched_mutex);
2748 }
2749
2750 void
2751 exitScheduler( void )
2752 {
2753     interrupted = rtsTrue;
2754     shutting_down_scheduler = rtsTrue;
2755
2756 #if defined(THREADED_RTS)
2757     { 
2758         Task *task;
2759         nat i;
2760         
2761         ACQUIRE_LOCK(&sched_mutex);
2762         task = newBoundTask();
2763         RELEASE_LOCK(&sched_mutex);
2764
2765         for (i = 0; i < n_capabilities; i++) {
2766             shutdownCapability(&capabilities[i], task);
2767         }
2768         boundTaskExiting(task);
2769         stopTaskManager();
2770     }
2771 #endif
2772 }
2773
2774 /* ---------------------------------------------------------------------------
2775    Where are the roots that we know about?
2776
2777         - all the threads on the runnable queue
2778         - all the threads on the blocked queue
2779         - all the threads on the sleeping queue
2780         - all the thread currently executing a _ccall_GC
2781         - all the "main threads"
2782      
2783    ------------------------------------------------------------------------ */
2784
2785 /* This has to be protected either by the scheduler monitor, or by the
2786         garbage collection monitor (probably the latter).
2787         KH @ 25/10/99
2788 */
2789
2790 void
2791 GetRoots( evac_fn evac )
2792 {
2793     nat i;
2794     Capability *cap;
2795     Task *task;
2796
2797 #if defined(GRAN)
2798     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2799         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2800             evac((StgClosure **)&run_queue_hds[i]);
2801         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2802             evac((StgClosure **)&run_queue_tls[i]);
2803         
2804         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2805             evac((StgClosure **)&blocked_queue_hds[i]);
2806         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2807             evac((StgClosure **)&blocked_queue_tls[i]);
2808         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2809             evac((StgClosure **)&ccalling_threads[i]);
2810     }
2811
2812     markEventQueue();
2813
2814 #else /* !GRAN */
2815
2816     for (i = 0; i < n_capabilities; i++) {
2817         cap = &capabilities[i];
2818         evac((StgClosure **)&cap->run_queue_hd);
2819         evac((StgClosure **)&cap->run_queue_tl);
2820         
2821         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2822              task=task->next) {
2823             evac((StgClosure **)&task->suspended_tso);
2824         }
2825     }
2826     
2827 #if !defined(THREADED_RTS)
2828     evac((StgClosure **)(void *)&blocked_queue_hd);
2829     evac((StgClosure **)(void *)&blocked_queue_tl);
2830     evac((StgClosure **)(void *)&sleeping_queue);
2831 #endif 
2832 #endif
2833
2834     // evac((StgClosure **)&blackhole_queue);
2835
2836 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2837     markSparkQueue(evac);
2838 #endif
2839     
2840 #if defined(RTS_USER_SIGNALS)
2841     // mark the signal handlers (signals should be already blocked)
2842     markSignalHandlers(evac);
2843 #endif
2844 }
2845
2846 /* -----------------------------------------------------------------------------
2847    performGC
2848
2849    This is the interface to the garbage collector from Haskell land.
2850    We provide this so that external C code can allocate and garbage
2851    collect when called from Haskell via _ccall_GC.
2852
2853    It might be useful to provide an interface whereby the programmer
2854    can specify more roots (ToDo).
2855    
2856    This needs to be protected by the GC condition variable above.  KH.
2857    -------------------------------------------------------------------------- */
2858
2859 static void (*extra_roots)(evac_fn);
2860
2861 static void
2862 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2863 {
2864     Task *task = myTask();
2865
2866     if (task == NULL) {
2867         ACQUIRE_LOCK(&sched_mutex);
2868         task = newBoundTask();
2869         RELEASE_LOCK(&sched_mutex);
2870         scheduleDoGC(NULL,task,force_major, get_roots);
2871         boundTaskExiting(task);
2872     } else {
2873         scheduleDoGC(NULL,task,force_major, get_roots);
2874     }
2875 }
2876
2877 void
2878 performGC(void)
2879 {
2880     performGC_(rtsFalse, GetRoots);
2881 }
2882
2883 void
2884 performMajorGC(void)
2885 {
2886     performGC_(rtsTrue, GetRoots);
2887 }
2888
2889 static void
2890 AllRoots(evac_fn evac)
2891 {
2892     GetRoots(evac);             // the scheduler's roots
2893     extra_roots(evac);          // the user's roots
2894 }
2895
2896 void
2897 performGCWithRoots(void (*get_roots)(evac_fn))
2898 {
2899     extra_roots = get_roots;
2900     performGC_(rtsFalse, AllRoots);
2901 }
2902
2903 /* -----------------------------------------------------------------------------
2904    Stack overflow
2905
2906    If the thread has reached its maximum stack size, then raise the
2907    StackOverflow exception in the offending thread.  Otherwise
2908    relocate the TSO into a larger chunk of memory and adjust its stack
2909    size appropriately.
2910    -------------------------------------------------------------------------- */
2911
2912 static StgTSO *
2913 threadStackOverflow(Capability *cap, StgTSO *tso)
2914 {
2915   nat new_stack_size, stack_words;
2916   lnat new_tso_size;
2917   StgPtr new_sp;
2918   StgTSO *dest;
2919
2920   IF_DEBUG(sanity,checkTSO(tso));
2921   if (tso->stack_size >= tso->max_stack_size) {
2922
2923     IF_DEBUG(gc,
2924              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2925                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2926              /* If we're debugging, just print out the top of the stack */
2927              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2928                                               tso->sp+64)));
2929
2930     /* Send this thread the StackOverflow exception */
2931     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2932     return tso;
2933   }
2934
2935   /* Try to double the current stack size.  If that takes us over the
2936    * maximum stack size for this thread, then use the maximum instead.
2937    * Finally round up so the TSO ends up as a whole number of blocks.
2938    */
2939   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2940   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2941                                        TSO_STRUCT_SIZE)/sizeof(W_);
2942   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2943   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2944
2945   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2946
2947   dest = (StgTSO *)allocate(new_tso_size);
2948   TICK_ALLOC_TSO(new_stack_size,0);
2949
2950   /* copy the TSO block and the old stack into the new area */
2951   memcpy(dest,tso,TSO_STRUCT_SIZE);
2952   stack_words = tso->stack + tso->stack_size - tso->sp;
2953   new_sp = (P_)dest + new_tso_size - stack_words;
2954   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2955
2956   /* relocate the stack pointers... */
2957   dest->sp         = new_sp;
2958   dest->stack_size = new_stack_size;
2959         
2960   /* Mark the old TSO as relocated.  We have to check for relocated
2961    * TSOs in the garbage collector and any primops that deal with TSOs.
2962    *
2963    * It's important to set the sp value to just beyond the end
2964    * of the stack, so we don't attempt to scavenge any part of the
2965    * dead TSO's stack.
2966    */
2967   tso->what_next = ThreadRelocated;
2968   tso->link = dest;
2969   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2970   tso->why_blocked = NotBlocked;
2971
2972   IF_PAR_DEBUG(verbose,
2973                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2974                      tso->id, tso, tso->stack_size);
2975                /* If we're debugging, just print out the top of the stack */
2976                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2977                                                 tso->sp+64)));
2978   
2979   IF_DEBUG(sanity,checkTSO(tso));
2980 #if 0
2981   IF_DEBUG(scheduler,printTSO(dest));
2982 #endif
2983
2984   return dest;
2985 }
2986
2987 /* ---------------------------------------------------------------------------
2988    Wake up a queue that was blocked on some resource.
2989    ------------------------------------------------------------------------ */
2990
2991 #if defined(GRAN)
2992 STATIC_INLINE void
2993 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2994 {
2995 }
2996 #elif defined(PARALLEL_HASKELL)
2997 STATIC_INLINE void
2998 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2999 {
3000   /* write RESUME events to log file and
3001      update blocked and fetch time (depending on type of the orig closure) */
3002   if (RtsFlags.ParFlags.ParStats.Full) {
3003     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3004                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3005                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3006     if (emptyRunQueue())
3007       emitSchedule = rtsTrue;
3008
3009     switch (get_itbl(node)->type) {
3010         case FETCH_ME_BQ:
3011           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3012           break;
3013         case RBH:
3014         case FETCH_ME:
3015         case BLACKHOLE_BQ:
3016           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3017           break;
3018 #ifdef DIST
3019         case MVAR:
3020           break;
3021 #endif    
3022         default:
3023           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3024         }
3025       }
3026 }
3027 #endif
3028
3029 #if defined(GRAN)
3030 StgBlockingQueueElement *
3031 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3032 {
3033     StgTSO *tso;
3034     PEs node_loc, tso_loc;
3035
3036     node_loc = where_is(node); // should be lifted out of loop
3037     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3038     tso_loc = where_is((StgClosure *)tso);
3039     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3040       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3041       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3042       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3043       // insertThread(tso, node_loc);
3044       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3045                 ResumeThread,
3046                 tso, node, (rtsSpark*)NULL);
3047       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3048       // len_local++;
3049       // len++;
3050     } else { // TSO is remote (actually should be FMBQ)
3051       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3052                                   RtsFlags.GranFlags.Costs.gunblocktime +
3053                                   RtsFlags.GranFlags.Costs.latency;
3054       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3055                 UnblockThread,
3056                 tso, node, (rtsSpark*)NULL);
3057       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3058       // len++;
3059     }
3060     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3061     IF_GRAN_DEBUG(bq,
3062                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3063                           (node_loc==tso_loc ? "Local" : "Global"), 
3064                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3065     tso->block_info.closure = NULL;
3066     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3067                              tso->id, tso));
3068 }
3069 #elif defined(PARALLEL_HASKELL)
3070 StgBlockingQueueElement *
3071 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3072 {
3073     StgBlockingQueueElement *next;
3074
3075     switch (get_itbl(bqe)->type) {
3076     case TSO:
3077       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3078       /* if it's a TSO just push it onto the run_queue */
3079       next = bqe->link;
3080       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3081       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3082       threadRunnable();
3083       unblockCount(bqe, node);
3084       /* reset blocking status after dumping event */
3085       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3086       break;
3087
3088     case BLOCKED_FETCH:
3089       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3090       next = bqe->link;
3091       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3092       PendingFetches = (StgBlockedFetch *)bqe;
3093       break;
3094
3095 # if defined(DEBUG)
3096       /* can ignore this case in a non-debugging setup; 
3097          see comments on RBHSave closures above */
3098     case CONSTR:
3099       /* check that the closure is an RBHSave closure */
3100       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3101              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3102              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3103       break;
3104
3105     default:
3106       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3107            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3108            (StgClosure *)bqe);
3109 # endif
3110     }
3111   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3112   return next;
3113 }
3114 #endif
3115
3116 StgTSO *
3117 unblockOne(Capability *cap, StgTSO *tso)
3118 {
3119   StgTSO *next;
3120
3121   ASSERT(get_itbl(tso)->type == TSO);
3122   ASSERT(tso->why_blocked != NotBlocked);
3123   tso->why_blocked = NotBlocked;
3124   next = tso->link;
3125   tso->link = END_TSO_QUEUE;
3126
3127   // We might have just migrated this TSO to our Capability:
3128   if (tso->bound) {
3129       tso->bound->cap = cap;
3130   }
3131
3132   appendToRunQueue(cap,tso);
3133
3134   // we're holding a newly woken thread, make sure we context switch
3135   // quickly so we can migrate it if necessary.
3136   context_switch = 1;
3137   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3138   return next;
3139 }
3140
3141
3142 #if defined(GRAN)
3143 void 
3144 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3145 {
3146   StgBlockingQueueElement *bqe;
3147   PEs node_loc;
3148   nat len = 0; 
3149
3150   IF_GRAN_DEBUG(bq, 
3151                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3152                       node, CurrentProc, CurrentTime[CurrentProc], 
3153                       CurrentTSO->id, CurrentTSO));
3154
3155   node_loc = where_is(node);
3156
3157   ASSERT(q == END_BQ_QUEUE ||
3158          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3159          get_itbl(q)->type == CONSTR); // closure (type constructor)
3160   ASSERT(is_unique(node));
3161
3162   /* FAKE FETCH: magically copy the node to the tso's proc;
3163      no Fetch necessary because in reality the node should not have been 
3164      moved to the other PE in the first place
3165   */
3166   if (CurrentProc!=node_loc) {
3167     IF_GRAN_DEBUG(bq, 
3168                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3169                         node, node_loc, CurrentProc, CurrentTSO->id, 
3170                         // CurrentTSO, where_is(CurrentTSO),
3171                         node->header.gran.procs));
3172     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3173     IF_GRAN_DEBUG(bq, 
3174                   debugBelch("## new bitmask of node %p is %#x\n",
3175                         node, node->header.gran.procs));
3176     if (RtsFlags.GranFlags.GranSimStats.Global) {
3177       globalGranStats.tot_fake_fetches++;
3178     }
3179   }
3180
3181   bqe = q;
3182   // ToDo: check: ASSERT(CurrentProc==node_loc);
3183   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3184     //next = bqe->link;
3185     /* 
3186        bqe points to the current element in the queue
3187        next points to the next element in the queue
3188     */
3189     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3190     //tso_loc = where_is(tso);
3191     len++;
3192     bqe = unblockOne(bqe, node);
3193   }
3194
3195   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3196      the closure to make room for the anchor of the BQ */
3197   if (bqe!=END_BQ_QUEUE) {
3198     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3199     /*
3200     ASSERT((info_ptr==&RBH_Save_0_info) ||
3201            (info_ptr==&RBH_Save_1_info) ||
3202            (info_ptr==&RBH_Save_2_info));
3203     */
3204     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3205     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3206     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3207
3208     IF_GRAN_DEBUG(bq,
3209                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3210                         node, info_type(node)));
3211   }
3212
3213   /* statistics gathering */
3214   if (RtsFlags.GranFlags.GranSimStats.Global) {
3215     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3216     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3217     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3218     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3219   }
3220   IF_GRAN_DEBUG(bq,
3221                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3222                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3223 }
3224 #elif defined(PARALLEL_HASKELL)
3225 void 
3226 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3227 {
3228   StgBlockingQueueElement *bqe;
3229
3230   IF_PAR_DEBUG(verbose, 
3231                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3232                      node, mytid));
3233 #ifdef DIST  
3234   //RFP
3235   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3236     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3237     return;
3238   }
3239 #endif
3240   
3241   ASSERT(q == END_BQ_QUEUE ||
3242          get_itbl(q)->type == TSO ||           
3243          get_itbl(q)->type == BLOCKED_FETCH || 
3244          get_itbl(q)->type == CONSTR); 
3245
3246   bqe = q;
3247   while (get_itbl(bqe)->type==TSO || 
3248          get_itbl(bqe)->type==BLOCKED_FETCH) {
3249     bqe = unblockOne(bqe, node);
3250   }
3251 }
3252
3253 #else   /* !GRAN && !PARALLEL_HASKELL */
3254
3255 void
3256 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3257 {
3258     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3259                              // Exception.cmm
3260     while (tso != END_TSO_QUEUE) {
3261         tso = unblockOne(cap,tso);
3262     }
3263 }
3264 #endif
3265
3266 /* ---------------------------------------------------------------------------
3267    Interrupt execution
3268    - usually called inside a signal handler so it mustn't do anything fancy.   
3269    ------------------------------------------------------------------------ */
3270
3271 void
3272 interruptStgRts(void)
3273 {
3274     interrupted    = 1;
3275     context_switch = 1;
3276 #if defined(THREADED_RTS)
3277     prodAllCapabilities();
3278 #endif
3279 }
3280
3281 /* -----------------------------------------------------------------------------
3282    Unblock a thread
3283
3284    This is for use when we raise an exception in another thread, which
3285    may be blocked.
3286    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3287    -------------------------------------------------------------------------- */
3288
3289 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3290 /*
3291   NB: only the type of the blocking queue is different in GranSim and GUM
3292       the operations on the queue-elements are the same
3293       long live polymorphism!
3294
3295   Locks: sched_mutex is held upon entry and exit.
3296
3297 */
3298 static void
3299 unblockThread(Capability *cap, StgTSO *tso)
3300 {
3301   StgBlockingQueueElement *t, **last;
3302
3303   switch (tso->why_blocked) {
3304
3305   case NotBlocked:
3306     return;  /* not blocked */
3307
3308   case BlockedOnSTM:
3309     // Be careful: nothing to do here!  We tell the scheduler that the thread
3310     // is runnable and we leave it to the stack-walking code to abort the 
3311     // transaction while unwinding the stack.  We should perhaps have a debugging
3312     // test to make sure that this really happens and that the 'zombie' transaction
3313     // does not get committed.
3314     goto done;
3315
3316   case BlockedOnMVar:
3317     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3318     {
3319       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3320       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3321
3322       last = (StgBlockingQueueElement **)&mvar->head;
3323       for (t = (StgBlockingQueueElement *)mvar->head; 
3324            t != END_BQ_QUEUE; 
3325            last = &t->link, last_tso = t, t = t->link) {
3326         if (t == (StgBlockingQueueElement *)tso) {
3327           *last = (StgBlockingQueueElement *)tso->link;
3328           if (mvar->tail == tso) {
3329             mvar->tail = (StgTSO *)last_tso;
3330           }
3331           goto done;
3332         }
3333       }
3334       barf("unblockThread (MVAR): TSO not found");
3335     }
3336
3337   case BlockedOnBlackHole:
3338     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3339     {
3340       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3341
3342       last = &bq->blocking_queue;
3343       for (t = bq->blocking_queue; 
3344            t != END_BQ_QUEUE; 
3345            last = &t->link, t = t->link) {
3346         if (t == (StgBlockingQueueElement *)tso) {
3347           *last = (StgBlockingQueueElement *)tso->link;
3348           goto done;
3349         }
3350       }
3351       barf("unblockThread (BLACKHOLE): TSO not found");
3352     }
3353
3354   case BlockedOnException:
3355     {
3356       StgTSO *target  = tso->block_info.tso;
3357
3358       ASSERT(get_itbl(target)->type == TSO);
3359
3360       if (target->what_next == ThreadRelocated) {
3361           target = target->link;
3362           ASSERT(get_itbl(target)->type == TSO);
3363       }
3364
3365       ASSERT(target->blocked_exceptions != NULL);
3366
3367       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3368       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3369            t != END_BQ_QUEUE; 
3370            last = &t->link, t = t->link) {
3371         ASSERT(get_itbl(t)->type == TSO);
3372         if (t == (StgBlockingQueueElement *)tso) {
3373           *last = (StgBlockingQueueElement *)tso->link;
3374           goto done;
3375         }
3376       }
3377       barf("unblockThread (Exception): TSO not found");
3378     }
3379
3380   case BlockedOnRead:
3381   case BlockedOnWrite:
3382 #if defined(mingw32_HOST_OS)
3383   case BlockedOnDoProc:
3384 #endif
3385     {
3386       /* take TSO off blocked_queue */
3387       StgBlockingQueueElement *prev = NULL;
3388       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3389            prev = t, t = t->link) {
3390         if (t == (StgBlockingQueueElement *)tso) {
3391           if (prev == NULL) {
3392             blocked_queue_hd = (StgTSO *)t->link;
3393             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3394               blocked_queue_tl = END_TSO_QUEUE;
3395             }
3396           } else {
3397             prev->link = t->link;
3398             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3399               blocked_queue_tl = (StgTSO *)prev;
3400             }
3401           }
3402 #if defined(mingw32_HOST_OS)
3403           /* (Cooperatively) signal that the worker thread should abort
3404            * the request.
3405            */
3406           abandonWorkRequest(tso->block_info.async_result->reqID);
3407 #endif
3408           goto done;
3409         }
3410       }
3411       barf("unblockThread (I/O): TSO not found");
3412     }
3413
3414   case BlockedOnDelay:
3415     {
3416       /* take TSO off sleeping_queue */
3417       StgBlockingQueueElement *prev = NULL;
3418       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3419            prev = t, t = t->link) {
3420         if (t == (StgBlockingQueueElement *)tso) {
3421           if (prev == NULL) {
3422             sleeping_queue = (StgTSO *)t->link;
3423           } else {
3424             prev->link = t->link;
3425           }
3426           goto done;
3427         }
3428       }
3429       barf("unblockThread (delay): TSO not found");
3430     }
3431
3432   default:
3433     barf("unblockThread");
3434   }
3435
3436  done:
3437   tso->link = END_TSO_QUEUE;
3438   tso->why_blocked = NotBlocked;
3439   tso->block_info.closure = NULL;
3440   pushOnRunQueue(cap,tso);
3441 }
3442 #else
3443 static void
3444 unblockThread(Capability *cap, StgTSO *tso)
3445 {
3446   StgTSO *t, **last;
3447   
3448   /* To avoid locking unnecessarily. */
3449   if (tso->why_blocked == NotBlocked) {
3450     return;
3451   }
3452
3453   switch (tso->why_blocked) {
3454
3455   case BlockedOnSTM:
3456     // Be careful: nothing to do here!  We tell the scheduler that the thread
3457     // is runnable and we leave it to the stack-walking code to abort the 
3458     // transaction while unwinding the stack.  We should perhaps have a debugging
3459     // test to make sure that this really happens and that the 'zombie' transaction
3460     // does not get committed.
3461     goto done;
3462
3463   case BlockedOnMVar:
3464     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3465     {
3466       StgTSO *last_tso = END_TSO_QUEUE;
3467       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3468
3469       last = &mvar->head;
3470       for (t = mvar->head; t != END_TSO_QUEUE; 
3471            last = &t->link, last_tso = t, t = t->link) {
3472         if (t == tso) {
3473           *last = tso->link;
3474           if (mvar->tail == tso) {
3475             mvar->tail = last_tso;
3476           }
3477           goto done;
3478         }
3479       }
3480       barf("unblockThread (MVAR): TSO not found");
3481     }
3482
3483   case BlockedOnBlackHole:
3484     {
3485       last = &blackhole_queue;
3486       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3487            last = &t->link, t = t->link) {
3488         if (t == tso) {
3489           *last = tso->link;
3490           goto done;
3491         }
3492       }
3493       barf("unblockThread (BLACKHOLE): TSO not found");
3494     }
3495
3496   case BlockedOnException:
3497     {
3498       StgTSO *target  = tso->block_info.tso;
3499
3500       ASSERT(get_itbl(target)->type == TSO);
3501
3502       while (target->what_next == ThreadRelocated) {
3503           target = target->link;
3504           ASSERT(get_itbl(target)->type == TSO);
3505       }
3506       
3507       ASSERT(target->blocked_exceptions != NULL);
3508
3509       last = &target->blocked_exceptions;
3510       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3511            last = &t->link, t = t->link) {
3512         ASSERT(get_itbl(t)->type == TSO);
3513         if (t == tso) {
3514           *last = tso->link;
3515           goto done;
3516         }
3517       }
3518       barf("unblockThread (Exception): TSO not found");
3519     }
3520
3521 #if !defined(THREADED_RTS)
3522   case BlockedOnRead:
3523   case BlockedOnWrite:
3524 #if defined(mingw32_HOST_OS)
3525   case BlockedOnDoProc:
3526 #endif
3527     {
3528       StgTSO *prev = NULL;
3529       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3530            prev = t, t = t->link) {
3531         if (t == tso) {
3532           if (prev == NULL) {
3533             blocked_queue_hd = t->link;
3534             if (blocked_queue_tl == t) {
3535               blocked_queue_tl = END_TSO_QUEUE;
3536             }
3537           } else {
3538             prev->link = t->link;
3539             if (blocked_queue_tl == t) {
3540               blocked_queue_tl = prev;
3541             }
3542           }
3543 #if defined(mingw32_HOST_OS)
3544           /* (Cooperatively) signal that the worker thread should abort
3545            * the request.
3546            */
3547           abandonWorkRequest(tso->block_info.async_result->reqID);
3548 #endif
3549           goto done;
3550         }
3551       }
3552       barf("unblockThread (I/O): TSO not found");
3553     }
3554
3555   case BlockedOnDelay:
3556     {
3557       StgTSO *prev = NULL;
3558       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3559            prev = t, t = t->link) {
3560         if (t == tso) {
3561           if (prev == NULL) {
3562             sleeping_queue = t->link;
3563           } else {
3564             prev->link = t->link;
3565           }
3566           goto done;
3567         }
3568       }
3569       barf("unblockThread (delay): TSO not found");
3570     }
3571 #endif
3572
3573   default:
3574     barf("unblockThread");
3575   }
3576
3577  done:
3578   tso->link = END_TSO_QUEUE;
3579   tso->why_blocked = NotBlocked;
3580   tso->block_info.closure = NULL;
3581   appendToRunQueue(cap,tso);
3582 }
3583 #endif
3584
3585 /* -----------------------------------------------------------------------------
3586  * checkBlackHoles()
3587  *
3588  * Check the blackhole_queue for threads that can be woken up.  We do
3589  * this periodically: before every GC, and whenever the run queue is
3590  * empty.
3591  *
3592  * An elegant solution might be to just wake up all the blocked
3593  * threads with awakenBlockedQueue occasionally: they'll go back to
3594  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3595  * doesn't give us a way to tell whether we've actually managed to
3596  * wake up any threads, so we would be busy-waiting.
3597  *
3598  * -------------------------------------------------------------------------- */
3599
3600 static rtsBool
3601 checkBlackHoles (Capability *cap)
3602 {
3603     StgTSO **prev, *t;
3604     rtsBool any_woke_up = rtsFalse;
3605     StgHalfWord type;
3606
3607     // blackhole_queue is global:
3608     ASSERT_LOCK_HELD(&sched_mutex);
3609
3610     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3611
3612     // ASSUMES: sched_mutex
3613     prev = &blackhole_queue;
3614     t = blackhole_queue;
3615     while (t != END_TSO_QUEUE) {
3616         ASSERT(t->why_blocked == BlockedOnBlackHole);
3617         type = get_itbl(t->block_info.closure)->type;
3618         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3619             IF_DEBUG(sanity,checkTSO(t));
3620             t = unblockOne(cap, t);
3621             // urk, the threads migrate to the current capability
3622             // here, but we'd like to keep them on the original one.
3623             *prev = t;
3624             any_woke_up = rtsTrue;
3625         } else {
3626             prev = &t->link;
3627             t = t->link;
3628         }
3629     }
3630
3631     return any_woke_up;
3632 }
3633
3634 /* -----------------------------------------------------------------------------
3635  * raiseAsync()
3636  *
3637  * The following function implements the magic for raising an
3638  * asynchronous exception in an existing thread.
3639  *
3640  * We first remove the thread from any queue on which it might be
3641  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3642  *
3643  * We strip the stack down to the innermost CATCH_FRAME, building
3644  * thunks in the heap for all the active computations, so they can 
3645  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3646  * an application of the handler to the exception, and push it on
3647  * the top of the stack.
3648  * 
3649  * How exactly do we save all the active computations?  We create an
3650  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3651  * AP_STACKs pushes everything from the corresponding update frame
3652  * upwards onto the stack.  (Actually, it pushes everything up to the
3653  * next update frame plus a pointer to the next AP_STACK object.
3654  * Entering the next AP_STACK object pushes more onto the stack until we
3655  * reach the last AP_STACK object - at which point the stack should look
3656  * exactly as it did when we killed the TSO and we can continue
3657  * execution by entering the closure on top of the stack.
3658  *
3659  * We can also kill a thread entirely - this happens if either (a) the 
3660  * exception passed to raiseAsync is NULL, or (b) there's no
3661  * CATCH_FRAME on the stack.  In either case, we strip the entire
3662  * stack and replace the thread with a zombie.
3663  *
3664  * ToDo: in THREADED_RTS mode, this function is only safe if either
3665  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3666  * one Capability), or (b) we own the Capability that the TSO is
3667  * currently blocked on or on the run queue of.
3668  *
3669  * -------------------------------------------------------------------------- */
3670  
3671 void
3672 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3673 {
3674     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3675 }
3676
3677 void
3678 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3679 {
3680     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3681 }
3682
3683 static void
3684 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3685             rtsBool stop_at_atomically, StgPtr stop_here)
3686 {
3687     StgRetInfoTable *info;
3688     StgPtr sp, frame;
3689     nat i;
3690   
3691     // Thread already dead?
3692     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3693         return;
3694     }
3695
3696     IF_DEBUG(scheduler, 
3697              sched_belch("raising exception in thread %ld.", (long)tso->id));
3698     
3699     // Remove it from any blocking queues
3700     unblockThread(cap,tso);
3701
3702     // mark it dirty; we're about to change its stack.
3703     dirtyTSO(tso);
3704
3705     sp = tso->sp;
3706     
3707     // The stack freezing code assumes there's a closure pointer on
3708     // the top of the stack, so we have to arrange that this is the case...
3709     //
3710     if (sp[0] == (W_)&stg_enter_info) {
3711         sp++;
3712     } else {
3713         sp--;
3714         sp[0] = (W_)&stg_dummy_ret_closure;
3715     }
3716
3717     frame = sp + 1;
3718     while (stop_here == NULL || frame < stop_here) {
3719
3720         // 1. Let the top of the stack be the "current closure"
3721         //
3722         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3723         // CATCH_FRAME.
3724         //
3725         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3726         // current closure applied to the chunk of stack up to (but not
3727         // including) the update frame.  This closure becomes the "current
3728         // closure".  Go back to step 2.
3729         //
3730         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3731         // top of the stack applied to the exception.
3732         // 
3733         // 5. If it's a STOP_FRAME, then kill the thread.
3734         // 
3735         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3736         // transaction
3737        
3738         info = get_ret_itbl((StgClosure *)frame);
3739
3740         switch (info->i.type) {
3741
3742         case UPDATE_FRAME:
3743         {
3744             StgAP_STACK * ap;
3745             nat words;
3746             
3747             // First build an AP_STACK consisting of the stack chunk above the
3748             // current update frame, with the top word on the stack as the
3749             // fun field.
3750             //
3751             words = frame - sp - 1;
3752             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3753             
3754             ap->size = words;
3755             ap->fun  = (StgClosure *)sp[0];
3756             sp++;
3757             for(i=0; i < (nat)words; ++i) {
3758                 ap->payload[i] = (StgClosure *)*sp++;
3759             }
3760             
3761             SET_HDR(ap,&stg_AP_STACK_info,
3762                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3763             TICK_ALLOC_UP_THK(words+1,0);
3764             
3765             IF_DEBUG(scheduler,
3766                      debugBelch("sched: Updating ");
3767                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3768                      debugBelch(" with ");
3769                      printObj((StgClosure *)ap);
3770                 );
3771
3772             // Replace the updatee with an indirection
3773             //
3774             // Warning: if we're in a loop, more than one update frame on
3775             // the stack may point to the same object.  Be careful not to
3776             // overwrite an IND_OLDGEN in this case, because we'll screw
3777             // up the mutable lists.  To be on the safe side, don't
3778             // overwrite any kind of indirection at all.  See also
3779             // threadSqueezeStack in GC.c, where we have to make a similar
3780             // check.
3781             //
3782             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3783                 // revert the black hole
3784                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3785                                (StgClosure *)ap);
3786             }
3787             sp += sizeofW(StgUpdateFrame) - 1;
3788             sp[0] = (W_)ap; // push onto stack
3789             frame = sp + 1;
3790             continue; //no need to bump frame
3791         }
3792
3793         case STOP_FRAME:
3794             // We've stripped the entire stack, the thread is now dead.
3795             tso->what_next = ThreadKilled;
3796             tso->sp = frame + sizeofW(StgStopFrame);
3797             return;
3798
3799         case CATCH_FRAME:
3800             // If we find a CATCH_FRAME, and we've got an exception to raise,
3801             // then build the THUNK raise(exception), and leave it on
3802             // top of the CATCH_FRAME ready to enter.
3803             //
3804         {
3805 #ifdef PROFILING
3806             StgCatchFrame *cf = (StgCatchFrame *)frame;
3807 #endif
3808             StgThunk *raise;
3809             
3810             if (exception == NULL) break;
3811
3812             // we've got an exception to raise, so let's pass it to the
3813             // handler in this frame.
3814             //
3815             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3816             TICK_ALLOC_SE_THK(1,0);
3817             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3818             raise->payload[0] = exception;
3819             
3820             // throw away the stack from Sp up to the CATCH_FRAME.
3821             //
3822             sp = frame - 1;
3823             
3824             /* Ensure that async excpetions are blocked now, so we don't get
3825              * a surprise exception before we get around to executing the
3826              * handler.
3827              */
3828             if (tso->blocked_exceptions == NULL) {
3829                 tso->blocked_exceptions = END_TSO_QUEUE;
3830             }
3831
3832             /* Put the newly-built THUNK on top of the stack, ready to execute
3833              * when the thread restarts.
3834              */
3835             sp[0] = (W_)raise;
3836             sp[-1] = (W_)&stg_enter_info;
3837             tso->sp = sp-1;
3838             tso->what_next = ThreadRunGHC;
3839             IF_DEBUG(sanity, checkTSO(tso));
3840             return;
3841         }
3842             
3843         case ATOMICALLY_FRAME:
3844             if (stop_at_atomically) {
3845                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3846                 stmCondemnTransaction(cap, tso -> trec);
3847 #ifdef REG_R1
3848                 tso->sp = frame;
3849 #else
3850                 // R1 is not a register: the return convention for IO in
3851                 // this case puts the return value on the stack, so we
3852                 // need to set up the stack to return to the atomically
3853                 // frame properly...
3854                 tso->sp = frame - 2;
3855                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3856                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3857 #endif
3858                 tso->what_next = ThreadRunGHC;
3859                 return;
3860             }
3861             // Not stop_at_atomically... fall through and abort the
3862             // transaction.
3863             
3864         case CATCH_RETRY_FRAME:
3865             // IF we find an ATOMICALLY_FRAME then we abort the
3866             // current transaction and propagate the exception.  In
3867             // this case (unlike ordinary exceptions) we do not care
3868             // whether the transaction is valid or not because its
3869             // possible validity cannot have caused the exception
3870             // and will not be visible after the abort.
3871             IF_DEBUG(stm,
3872                      debugBelch("Found atomically block delivering async exception\n"));
3873             StgTRecHeader *trec = tso -> trec;
3874             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3875             stmAbortTransaction(cap, trec);
3876             tso -> trec = outer;
3877             break;
3878             
3879         default:
3880             break;
3881         }
3882
3883         // move on to the next stack frame
3884         frame += stack_frame_sizeW((StgClosure *)frame);
3885     }
3886
3887     // if we got here, then we stopped at stop_here
3888     ASSERT(stop_here != NULL);
3889 }
3890
3891 /* -----------------------------------------------------------------------------
3892    Deleting threads
3893
3894    This is used for interruption (^C) and forking, and corresponds to
3895    raising an exception but without letting the thread catch the
3896    exception.
3897    -------------------------------------------------------------------------- */
3898
3899 static void 
3900 deleteThread (Capability *cap, StgTSO *tso)
3901 {
3902   if (tso->why_blocked != BlockedOnCCall &&
3903       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3904       raiseAsync(cap,tso,NULL);
3905   }
3906 }
3907
3908 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3909 static void 
3910 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3911 { // for forkProcess only:
3912   // delete thread without giving it a chance to catch the KillThread exception
3913
3914   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3915       return;
3916   }
3917
3918   if (tso->why_blocked != BlockedOnCCall &&
3919       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3920       unblockThread(cap,tso);
3921   }
3922
3923   tso->what_next = ThreadKilled;
3924 }
3925 #endif
3926
3927 /* -----------------------------------------------------------------------------
3928    raiseExceptionHelper
3929    
3930    This function is called by the raise# primitve, just so that we can
3931    move some of the tricky bits of raising an exception from C-- into
3932    C.  Who knows, it might be a useful re-useable thing here too.
3933    -------------------------------------------------------------------------- */
3934
3935 StgWord
3936 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3937 {
3938     Capability *cap = regTableToCapability(reg);
3939     StgThunk *raise_closure = NULL;
3940     StgPtr p, next;
3941     StgRetInfoTable *info;
3942     //
3943     // This closure represents the expression 'raise# E' where E
3944     // is the exception raise.  It is used to overwrite all the
3945     // thunks which are currently under evaluataion.
3946     //
3947
3948     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3949     // LDV profiling: stg_raise_info has THUNK as its closure
3950     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3951     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3952     // 1 does not cause any problem unless profiling is performed.
3953     // However, when LDV profiling goes on, we need to linearly scan
3954     // small object pool, where raise_closure is stored, so we should
3955     // use MIN_UPD_SIZE.
3956     //
3957     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3958     //                                 sizeofW(StgClosure)+1);
3959     //
3960
3961     //
3962     // Walk up the stack, looking for the catch frame.  On the way,
3963     // we update any closures pointed to from update frames with the
3964     // raise closure that we just built.
3965     //
3966     p = tso->sp;
3967     while(1) {
3968         info = get_ret_itbl((StgClosure *)p);
3969         next = p + stack_frame_sizeW((StgClosure *)p);
3970         switch (info->i.type) {
3971             
3972         case UPDATE_FRAME:
3973             // Only create raise_closure if we need to.
3974             if (raise_closure == NULL) {
3975                 raise_closure = 
3976                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3977                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3978                 raise_closure->payload[0] = exception;
3979             }
3980             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3981             p = next;
3982             continue;
3983
3984         case ATOMICALLY_FRAME:
3985             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3986             tso->sp = p;
3987             return ATOMICALLY_FRAME;
3988             
3989         case CATCH_FRAME:
3990             tso->sp = p;
3991             return CATCH_FRAME;
3992
3993         case CATCH_STM_FRAME:
3994             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3995             tso->sp = p;
3996             return CATCH_STM_FRAME;
3997             
3998         case STOP_FRAME:
3999             tso->sp = p;
4000             return STOP_FRAME;
4001
4002         case CATCH_RETRY_FRAME:
4003         default:
4004             p = next; 
4005             continue;
4006         }
4007     }
4008 }
4009
4010
4011 /* -----------------------------------------------------------------------------
4012    findRetryFrameHelper
4013
4014    This function is called by the retry# primitive.  It traverses the stack
4015    leaving tso->sp referring to the frame which should handle the retry.  
4016
4017    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4018    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4019
4020    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4021    despite the similar implementation.
4022
4023    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4024    not be created within memory transactions.
4025    -------------------------------------------------------------------------- */
4026
4027 StgWord
4028 findRetryFrameHelper (StgTSO *tso)
4029 {
4030   StgPtr           p, next;
4031   StgRetInfoTable *info;
4032
4033   p = tso -> sp;
4034   while (1) {
4035     info = get_ret_itbl((StgClosure *)p);
4036     next = p + stack_frame_sizeW((StgClosure *)p);
4037     switch (info->i.type) {
4038       
4039     case ATOMICALLY_FRAME:
4040       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4041       tso->sp = p;
4042       return ATOMICALLY_FRAME;
4043       
4044     case CATCH_RETRY_FRAME:
4045       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4046       tso->sp = p;
4047       return CATCH_RETRY_FRAME;
4048       
4049     case CATCH_STM_FRAME:
4050     default:
4051       ASSERT(info->i.type != CATCH_FRAME);
4052       ASSERT(info->i.type != STOP_FRAME);
4053       p = next; 
4054       continue;
4055     }
4056   }
4057 }
4058
4059 /* -----------------------------------------------------------------------------
4060    resurrectThreads is called after garbage collection on the list of
4061    threads found to be garbage.  Each of these threads will be woken
4062    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4063    on an MVar, or NonTermination if the thread was blocked on a Black
4064    Hole.
4065
4066    Locks: assumes we hold *all* the capabilities.
4067    -------------------------------------------------------------------------- */
4068
4069 void
4070 resurrectThreads (StgTSO *threads)
4071 {
4072     StgTSO *tso, *next;
4073     Capability *cap;
4074
4075     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4076         next = tso->global_link;
4077         tso->global_link = all_threads;
4078         all_threads = tso;
4079         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4080         
4081         // Wake up the thread on the Capability it was last on for a
4082         // bound thread, or last_free_capability otherwise.
4083         if (tso->bound) {
4084             cap = tso->bound->cap;
4085         } else {
4086             cap = last_free_capability;
4087         }
4088         
4089         switch (tso->why_blocked) {
4090         case BlockedOnMVar:
4091         case BlockedOnException:
4092             /* Called by GC - sched_mutex lock is currently held. */
4093             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4094             break;
4095         case BlockedOnBlackHole:
4096             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4097             break;
4098         case BlockedOnSTM:
4099             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4100             break;
4101         case NotBlocked:
4102             /* This might happen if the thread was blocked on a black hole
4103              * belonging to a thread that we've just woken up (raiseAsync
4104              * can wake up threads, remember...).
4105              */
4106             continue;
4107         default:
4108             barf("resurrectThreads: thread blocked in a strange way");
4109         }
4110     }
4111 }
4112
4113 /* ----------------------------------------------------------------------------
4114  * Debugging: why is a thread blocked
4115  * [Also provides useful information when debugging threaded programs
4116  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4117    ------------------------------------------------------------------------- */
4118
4119 #if DEBUG
4120 static void
4121 printThreadBlockage(StgTSO *tso)
4122 {
4123   switch (tso->why_blocked) {
4124   case BlockedOnRead:
4125     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4126     break;
4127   case BlockedOnWrite:
4128     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4129     break;
4130 #if defined(mingw32_HOST_OS)
4131     case BlockedOnDoProc:
4132     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4133     break;
4134 #endif
4135   case BlockedOnDelay:
4136     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4137     break;
4138   case BlockedOnMVar:
4139     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4140     break;
4141   case BlockedOnException:
4142     debugBelch("is blocked on delivering an exception to thread %d",
4143             tso->block_info.tso->id);
4144     break;
4145   case BlockedOnBlackHole:
4146     debugBelch("is blocked on a black hole");
4147     break;
4148   case NotBlocked:
4149     debugBelch("is not blocked");
4150     break;
4151 #if defined(PARALLEL_HASKELL)
4152   case BlockedOnGA:
4153     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4154             tso->block_info.closure, info_type(tso->block_info.closure));
4155     break;
4156   case BlockedOnGA_NoSend:
4157     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4158             tso->block_info.closure, info_type(tso->block_info.closure));
4159     break;
4160 #endif
4161   case BlockedOnCCall:
4162     debugBelch("is blocked on an external call");
4163     break;
4164   case BlockedOnCCall_NoUnblockExc:
4165     debugBelch("is blocked on an external call (exceptions were already blocked)");
4166     break;
4167   case BlockedOnSTM:
4168     debugBelch("is blocked on an STM operation");
4169     break;
4170   default:
4171     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4172          tso->why_blocked, tso->id, tso);
4173   }
4174 }
4175
4176 void
4177 printThreadStatus(StgTSO *t)
4178 {
4179     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4180     {
4181       void *label = lookupThreadLabel(t->id);
4182       if (label) debugBelch("[\"%s\"] ",(char *)label);
4183     }
4184     if (t->what_next == ThreadRelocated) {
4185         debugBelch("has been relocated...\n");
4186     } else {
4187         switch (t->what_next) {
4188         case ThreadKilled:
4189             debugBelch("has been killed");
4190             break;
4191         case ThreadComplete:
4192             debugBelch("has completed");
4193             break;
4194         default:
4195             printThreadBlockage(t);
4196         }
4197         debugBelch("\n");
4198     }
4199 }
4200
4201 void
4202 printAllThreads(void)
4203 {
4204   StgTSO *t, *next;
4205   nat i;
4206   Capability *cap;
4207
4208 # if defined(GRAN)
4209   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4210   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4211                        time_string, rtsFalse/*no commas!*/);
4212
4213   debugBelch("all threads at [%s]:\n", time_string);
4214 # elif defined(PARALLEL_HASKELL)
4215   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4216   ullong_format_string(CURRENT_TIME,
4217                        time_string, rtsFalse/*no commas!*/);
4218
4219   debugBelch("all threads at [%s]:\n", time_string);
4220 # else
4221   debugBelch("all threads:\n");
4222 # endif
4223
4224   for (i = 0; i < n_capabilities; i++) {
4225       cap = &capabilities[i];
4226       debugBelch("threads on capability %d:\n", cap->no);
4227       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4228           printThreadStatus(t);
4229       }
4230   }
4231
4232   debugBelch("other threads:\n");
4233   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4234       if (t->why_blocked != NotBlocked) {
4235           printThreadStatus(t);
4236       }
4237       if (t->what_next == ThreadRelocated) {
4238           next = t->link;
4239       } else {
4240           next = t->global_link;
4241       }
4242   }
4243 }
4244
4245 // useful from gdb
4246 void 
4247 printThreadQueue(StgTSO *t)
4248 {
4249     nat i = 0;
4250     for (; t != END_TSO_QUEUE; t = t->link) {
4251         printThreadStatus(t);
4252         i++;
4253     }
4254     debugBelch("%d threads on queue\n", i);
4255 }
4256
4257 /* 
4258    Print a whole blocking queue attached to node (debugging only).
4259 */
4260 # if defined(PARALLEL_HASKELL)
4261 void 
4262 print_bq (StgClosure *node)
4263 {
4264   StgBlockingQueueElement *bqe;
4265   StgTSO *tso;
4266   rtsBool end;
4267
4268   debugBelch("## BQ of closure %p (%s): ",
4269           node, info_type(node));
4270
4271   /* should cover all closures that may have a blocking queue */
4272   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4273          get_itbl(node)->type == FETCH_ME_BQ ||
4274          get_itbl(node)->type == RBH ||
4275          get_itbl(node)->type == MVAR);
4276     
4277   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4278
4279   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4280 }
4281
4282 /* 
4283    Print a whole blocking queue starting with the element bqe.
4284 */
4285 void 
4286 print_bqe (StgBlockingQueueElement *bqe)
4287 {
4288   rtsBool end;
4289
4290   /* 
4291      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4292   */
4293   for (end = (bqe==END_BQ_QUEUE);
4294        !end; // iterate until bqe points to a CONSTR
4295        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4296        bqe = end ? END_BQ_QUEUE : bqe->link) {
4297     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4298     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4299     /* types of closures that may appear in a blocking queue */
4300     ASSERT(get_itbl(bqe)->type == TSO ||           
4301            get_itbl(bqe)->type == BLOCKED_FETCH || 
4302            get_itbl(bqe)->type == CONSTR); 
4303     /* only BQs of an RBH end with an RBH_Save closure */
4304     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4305
4306     switch (get_itbl(bqe)->type) {
4307     case TSO:
4308       debugBelch(" TSO %u (%x),",
4309               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4310       break;
4311     case BLOCKED_FETCH:
4312       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4313               ((StgBlockedFetch *)bqe)->node, 
4314               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4315               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4316               ((StgBlockedFetch *)bqe)->ga.weight);
4317       break;
4318     case CONSTR:
4319       debugBelch(" %s (IP %p),",
4320               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4321                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4322                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4323                "RBH_Save_?"), get_itbl(bqe));
4324       break;
4325     default:
4326       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4327            info_type((StgClosure *)bqe)); // , node, info_type(node));
4328       break;
4329     }
4330   } /* for */
4331   debugBelch("\n");
4332 }
4333 # elif defined(GRAN)
4334 void 
4335 print_bq (StgClosure *node)
4336 {
4337   StgBlockingQueueElement *bqe;
4338   PEs node_loc, tso_loc;
4339   rtsBool end;
4340
4341   /* should cover all closures that may have a blocking queue */
4342   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4343          get_itbl(node)->type == FETCH_ME_BQ ||
4344          get_itbl(node)->type == RBH);
4345     
4346   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4347   node_loc = where_is(node);
4348
4349   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4350           node, info_type(node), node_loc);
4351
4352   /* 
4353      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4354   */
4355   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4356        !end; // iterate until bqe points to a CONSTR
4357        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4358     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4359     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4360     /* types of closures that may appear in a blocking queue */
4361     ASSERT(get_itbl(bqe)->type == TSO ||           
4362            get_itbl(bqe)->type == CONSTR); 
4363     /* only BQs of an RBH end with an RBH_Save closure */
4364     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4365
4366     tso_loc = where_is((StgClosure *)bqe);
4367     switch (get_itbl(bqe)->type) {
4368     case TSO:
4369       debugBelch(" TSO %d (%p) on [PE %d],",
4370               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4371       break;
4372     case CONSTR:
4373       debugBelch(" %s (IP %p),",
4374               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4375                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4376                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4377                "RBH_Save_?"), get_itbl(bqe));
4378       break;
4379     default:
4380       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4381            info_type((StgClosure *)bqe), node, info_type(node));
4382       break;
4383     }
4384   } /* for */
4385   debugBelch("\n");
4386 }
4387 # endif
4388
4389 #if defined(PARALLEL_HASKELL)
4390 static nat
4391 run_queue_len(void)
4392 {
4393     nat i;
4394     StgTSO *tso;
4395     
4396     for (i=0, tso=run_queue_hd; 
4397          tso != END_TSO_QUEUE;
4398          i++, tso=tso->link) {
4399         /* nothing */
4400     }
4401         
4402     return i;
4403 }
4404 #endif
4405
4406 void
4407 sched_belch(char *s, ...)
4408 {
4409     va_list ap;
4410     va_start(ap,s);
4411 #ifdef THREADED_RTS
4412     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4413 #elif defined(PARALLEL_HASKELL)
4414     debugBelch("== ");
4415 #else
4416     debugBelch("sched: ");
4417 #endif
4418     vdebugBelch(s, ap);
4419     debugBelch("\n");
4420     va_end(ap);
4421 }
4422
4423 #endif /* DEBUG */